项目实训第三天

本文最后更新于:2021年7月23日 晚上


MapReduce

组件

InputFormat - 输入格式

  1. InputFormat发生在Mapper之前,需要先读取数据,然后将读取出来的数据交给Mapper处理,所以InputFormat读取出来的数据是什么样子,那么Mapper就是什么样子
  2. 如果不指定,那么MapReduce默认使用的是TextInputFormat,TextInputFormat在读取数据的时候,默认是按行读取数据,读取出来的数据的键是数据的字节偏移量,值是读取的这一行数据
  3. 在MapReduce中,只有BZip2对应的压缩文件可以切片,其他的压缩文件都是不可切片的
  4. 自定义输入格式:需要定义一个类继承FileInputFormat,覆盖createRecordReader方法
  5. 多源输入:在MapReduce中允许指定多个不同的输入路径,同时处理多个文件。而这多个文件的格式可以不一样,可以利用Mapper来统一格式,最终交给Reducer来进行处理

实现

authinput(Package)

数据(score2.txt):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
tom
math 90
english 98
jary
math 78
english 87
rose
math 87
english 90
bob
math 67
english 87
alex
math 59
english 80
helen
math 79
english 60
  1. AuthInputFormat.java:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    package com.quosimodo.authinput;

    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.util.LineReader;

    import java.io.IOException;
    import java.net.URI;

    // 泛型表示读取出来的数据类型
    public class AuthInputFormat extends FileInputFormat<Text, Text> {
    // 需要在这种方法中定义流来读取数据
    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
    return new AuthReader();
    }
    }

    class AuthReader extends RecordReader<Text, Text> {

    private LineReader reader;
    private Text key;
    private Text value;
    private float len;
    private float pos = 0;

    // 在初始化过程中被调用一次
    // 在实际过程中,会利用这个方法来构建一个新的流用于读取数据
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException {
    // 转成文件切片
    FileSplit fileSplit = (FileSplit) split;
    // 获取切片对应的路径
    Path path = fileSplit.getPath();
    // 获取Split的大小
    len = fileSplit.getLength();
    // 连接HDFS
    FileSystem fs = FileSystem.get(
    URI.create(path.toString()), context.getConfiguration());
    // 获取输入流
    FSDataInputStream in = fs.open(path);
    // 获取到的是一个字节流,但是要处理的数据是一个字符文件
    // 考虑转化成字符流
    reader = new LineReader(in);
    }

    // 判断是否有下一个键值对要交给Mapper处理
    // 试着读取数据,如果读到了数据,那么说明有数据交给Mapper处理
    @Override
    public boolean nextKeyValue() throws IOException {
    // 定义变量用于存储数据
    key = new Text();
    value = new Text();
    Text tmp = new Text();
    // 读取数据
    // reader会将读取到的一行数据放到tmp中
    if (reader.readLine(tmp) <= 0) return false;
    key.set(tmp.toString());
    pos += tmp.getLength();
    // 读取第二行数据
    if (reader.readLine(tmp) <= 0) return false;
    value.set(tmp.toString());
    pos += tmp.getLength();
    // 读取第三行数据
    if (reader.readLine(tmp) <= 0) return false;
    value.append(" ".getBytes(), 0, 1);
    value.append(tmp.getBytes(), 0, tmp.getLength());
    pos += tmp.getLength();
    // key = tom
    // value = math 78 english 87
    return true;
    }

    @Override
    public Text getCurrentKey() {
    return key;
    }

    @Override
    public Text getCurrentValue() {
    return value;
    }

    // 获取执行进度
    @Override
    public float getProgress() {
    return pos / len;
    }

    @Override
    public void close() throws IOException {
    if (reader != null)
    reader.close();
    }
    }

  2. AuthMapper.java:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    package com.quosimodo.authinput;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    import java.io.IOException;

    public class AuthMapper extends Mapper<Text, Text, Text, IntWritable> {
    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
    // key = tom
    // value = math 78 english 87
    // 拆分值
    String[] arr = value.toString().split(" ");
    context.write(key, new IntWritable(Integer.parseInt(arr[1])));
    context.write(key, new IntWritable(Integer.parseInt(arr[3])));
    }
    }

  3. AuthReducer.java:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    package com.quosimodo.authinput;

    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.util.LineReader;

    import java.io.IOException;
    import java.net.URI;

    // 泛型表示读取出来的数据类型
    public class AuthInputFormat extends FileInputFormat<Text, Text> {
    // 需要在这种方法中定义流来读取数据
    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
    return new AuthReader();
    }
    }

    class AuthReader extends RecordReader<Text, Text> {

    private LineReader reader;
    private Text key;
    private Text value;
    private float len;
    private float pos = 0;

    // 在初始化过程中被调用一次
    // 在实际过程中,会利用这个方法来构建一个新的流用于读取数据
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException {
    // 转成文件切片
    FileSplit fileSplit = (FileSplit) split;
    // 获取切片对应的路径
    Path path = fileSplit.getPath();
    // 获取Split的大小
    len = fileSplit.getLength();
    // 连接HDFS
    FileSystem fs = FileSystem.get(
    URI.create(path.toString()), context.getConfiguration());
    // 获取输入流
    FSDataInputStream in = fs.open(path);
    // 获取到的是一个字节流,但是要处理的数据是一个字符文件
    // 考虑转化成字符流
    reader = new LineReader(in);
    }

    // 判断是否有下一个键值对要交给Mapper处理
    // 试着读取数据,如果读到了数据,那么说明有数据交给Mapper处理
    @Override
    public boolean nextKeyValue() throws IOException {
    // 定义变量用于存储数据
    key = new Text();
    value = new Text();
    Text tmp = new Text();
    // 读取数据
    // reader会将读取到的一行数据放到tmp中
    if (reader.readLine(tmp) <= 0) return false;
    key.set(tmp.toString());
    pos += tmp.getLength();
    // 读取第二行数据
    if (reader.readLine(tmp) <= 0) return false;
    value.set(tmp.toString());
    pos += tmp.getLength();
    // 读取第三行数据
    if (reader.readLine(tmp) <= 0) return false;
    value.append(" ".getBytes(), 0, 1);
    value.append(tmp.getBytes(), 0, tmp.getLength());
    pos += tmp.getLength();
    // key = tom
    // value = math 78 english 87
    return true;
    }

    @Override
    public Text getCurrentKey() {
    return key;
    }

    @Override
    public Text getCurrentValue() {
    return value;
    }

    // 获取执行进度
    @Override
    public float getProgress() {
    return pos / len;
    }

    @Override
    public void close() throws IOException {
    if (reader != null)
    reader.close();
    }
    }

  4. AuthDriver.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    package com.quosimodo.authinput;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;

    public class AuthDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);

    job.setJarByClass(AuthDriver.class);
    job.setMapperClass(AuthMapper.class);
    job.setReducerClass(AuthReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setInputFormatClass(AuthInputFormat.class);

    FileInputFormat.addInputPath(job,
    new Path("hdfs://hadoop:9000/txt/score2.txt"));
    FileOutputFormat.setOutputPath(job,
    new Path("hdfs://hadoop:9000/result/auth_input"));

    job.waitForCompletion(true);
    }

    }

运行输出:


authoutput(Package)

数据(score2.txt)

  1. AuthOutputFormat.java:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    package com.quosimodo.authoutput;

    import com.quosimodo.authinput.AuthInputFormat;
    import com.quosimodo.authinput.AuthMapper;
    import com.quosimodo.authinput.AuthReducer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;

    public class AuthOutputDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

    Configuration conf = new Configuration();
    conf.set("mapreduce.output.textoutputformat.separator", ",");
    Job job = Job.getInstance(conf);

    job.setJarByClass(AuthOutputDriver.class);
    job.setMapperClass(AuthMapper.class);
    job.setReducerClass(AuthReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setInputFormatClass(AuthInputFormat.class);
    job.setOutputFormatClass(AuthOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path("hdfs://hadoop:9000/txt/score2.txt"));
    FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop:9000/result/auth_output"));

    job.waitForCompletion(true);

    }
    }

  2. AuthDriver.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    package com.quosimodo.authinput;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;

    public class AuthDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);

    job.setJarByClass(AuthDriver.class);
    job.setMapperClass(AuthMapper.class);
    job.setReducerClass(AuthReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setInputFormatClass(AuthInputFormat.class);

    FileInputFormat.addInputPath(job,
    new Path("hdfs://hadoop:9000/txt/score2.txt"));
    FileOutputFormat.setOutputPath(job,
    new Path("hdfs://hadoop:9000/result/auth_input"));

    job.waitForCompletion(true);
    }
    }


运行输出:


multipleinput(Package)

数据(score2.txt)

  1. ScoreMapper.java:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    package com.quosimodo.multipleinput;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    import java.io.IOException;

    // 处理score.txt
    // Bob 90 64 92 83 82 95
    public class ScoreMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String[] arr = value.toString().split(" ");
    Text name = new Text(arr[0]);
    for (int i = 1; i < arr.length; i++) {
    context.write(name, new IntWritable(Integer.parseInt(arr[i])));
    }
    }
    }

  2. MultipleDriver.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    package com.quosimodo.multipleinput;

    import com.quosimodo.authinput.AuthInputFormat;
    import com.quosimodo.authinput.AuthMapper;
    import com.quosimodo.authinput.AuthReducer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.BZip2Codec;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;

    public class MultipleDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);

    job.setJarByClass(MultipleDriver.class);
    job.setReducerClass(AuthReducer.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    // 多源输入
    MultipleInputs.addInputPath(job, new Path("hdfs://hadoop:9000/txt/score.txt"),
    TextInputFormat.class, ScoreMapper.class);
    MultipleInputs.addInputPath(job, new Path("hdfs://hadoop:9000/txt/score2.txt"),
    AuthInputFormat.class, AuthMapper.class);
    // FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.233.133:9000/result/multiple"));
    // 要将最终的结果打成一个压缩包
    FileOutputFormat.setCompressOutput(job, true);
    // 指定压缩包的存储位置
    FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop:9000/result/compress"));
    // 指定压缩格式
    FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
    job.waitForCompletion(true);
    }

    }

运行输出:


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!