项目实训第二天

本文最后更新于:2021年7月23日 晚上


MapReduce

简介

  1. MapReduce在处理数据的时候,首先将数据进行切片(Split),切片本质上是一种逻辑划分(logical split),实际上实在划分任务量
  2. 划分完任务量之后,每一个Split都会交给一个MapTask来处理

组件

序列化

  1. 在MapReduce中,需要对数据进行序列化,MapReduce中单独提供了对应的序列化形式 - 需要被序列化的对象对应的类实现接口Writable
  2. MapReduce在序列化过程中不允许属性为null
  3. 案例:统计每一个人花费的总流量(文件:flow.txt)
  4. 练习:获取每一个人的平均成绩(文件:score.txt)

分区

  1. 分区的作用是对数据进行分类
  2. 在实际过程中,需要根据指定的需求来对数据进行分类,指定不同的分区
  3. 案例:按地区分别统计每一个人花费的总流量(文件:flow.txt)
  4. MapReduce需要对分区来进行编号,编号从0开始依次向上递增
  5. 每一个分类需要对一个单独的ReduceTask,有几个分类,就需要产生对应个数的ReduceTask
  6. 练习:按月份统计每一个人的总成绩(目录:score)

排序

  1. MapReduce会自动的数据的键来进行排序,默认是按照自然序
  2. 如果需要指定排序规则,那么键的位置上的元素对应的类必须实现Comparable,考虑对数据进行序列化,所以实现WritableComparable
  3. 案例:对之前的总流量来进行降序排序
  4. 练习:按照月份升序排序,如果是同一个月,则按照利润来降序排序(文件:profit.txt)

答案

序列化


统计每一个人花费的总流量

数据(flow.txt):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
18642971356 shanghai david 4132 4121
15936842654 shanghai peter 236 7566
13012945687 beijing helen 4152 5321
13548627458 beijing alex 452 759
15432697314 hangzhou jack 4558 7474
13012945687 beijing helen 587 2463
15012665465 shanghai holly 864 995
13012945687 beijing helen 4232 7435
18642971356 shanghai david 7434 2744
15012665465 shanghai holly 3247 756
18642971356 shanghai david 7641 7541
15432697314 hangzhou jack 764 131
13548627458 beijing alex 711 5424
18642971356 shanghai david 521 7654
13012945687 beijing helen 445 2387
13548627458 beijing alex 5854 3877
15936842654 shanghai peter 5223 9645
15936842654 shanghai peter 74875 8541
18642971356 shanghai david 584 476
15732654952 beijing bruce 557 585
15012665465 shanghai holly 478 6598
15432697314 hangzhou jack 4855 7885
15936842654 shanghai peter 566 8942
15732654952 beijing bruce 4532 4698
18023643218 hangzhou adair 578 5875
15732654952 beijing bruce 5464 763
15432697314 hangzhou jack 574 553
18023643218 hangzhou adair 954 4310
15936842654 shanghai peter 7324 5456
18023643218 hangzhou adair 4778 8746
15012665465 shanghai holly 5656 4131
15732654952 beijing bruce 632 7432
13548627458 beijing alex 414 4125
18023643218 hangzhou adair 7456 432
13548627458 beijing alex 365 780
15012665465 shanghai holly 5245 5521
15432697314 hangzhou jack 5747 4563
15732654952 beijing bruce 882 8648

上传到hdfs的/txt文件夹下

  1. 数据存储类实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    package com.quosimodo.serialflow;

    import org.apache.hadoop.io.Writable;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    public class Flow implements Writable {

    private int upFlow; // 上行流量
    private int downFlow; // 下行流量

    public int getUpFlow() {
    return upFlow;
    }

    public void setUpFlow(int upFlow) {
    this.upFlow = upFlow;
    }

    public int getDownFlow() {
    return downFlow;
    }

    public void setDownFlow(int downFlow) {
    this.downFlow = downFlow;
    }

    // 序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
    out.writeInt(upFlow);
    out.writeInt(downFlow);
    }

    // 反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
    this.upFlow = in.readInt();
    this.downFlow = in.readInt();
    }
    }
  2. Mapper类的实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    package com.quosimodo.serialflow;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    import java.io.IOException;

    public class SerialFlowMapper extends Mapper<LongWritable, Text, Text, Flow> {
    // key:输入的键,指的是行的字节偏移量
    // value:输入的值,指的是要处理的一行数据
    // context:环境参数,用于写出结果
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    // 18642971356 shanghai david 4132 4121
    // 拆分字段
    String[] arr = value.toString().split(" ");
    // 封装流量信息
    Flow f = new Flow();
    f.setUpFlow(Integer.parseInt(arr[3]));
    f.setDownFlow(Integer.parseInt(arr[4]));
    // 写出数据
    context.write(new Text(arr[2]), f);
    }

    }
  1. Reducer类实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    package com.quosimodo.serialflow;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    import java.io.IOException;

    // 输入的键值类型,键值类型和Mapper的输出是一致的
    public class SerialFlowReducer extends Reducer<Text, Flow, Text, IntWritable> {
    // key:Mapper输出的键
    // values:将Mapper输出的值放到了迭代器中
    @Override
    protected void reduce(Text key, Iterable<Flow> values, Context context) throws IOException, InterruptedException {
    // 记录总流量
    int sum = 0;
    // 统计总流量
    for (Flow value : values) {
    sum += value.getUpFlow() + value.getDownFlow();
    }
    context.write(key, new IntWritable(sum));
    }
    }
  1. Driver类实现:配置启动类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    package com.quosimodo.serialflow;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;

    public class SerialFlowDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

    // 获取环境配置
    Configuration conf = new Configuration();
    // 获取任务对象
    Job job = Job.getInstance(conf);

    // 设置入口类
    job.setJarByClass(SerialFlowDriver.class);
    // 设置Mapper
    job.setMapperClass(SerialFlowMapper.class);
    // 设置Reducer
    job.setReducerClass(SerialFlowReducer.class);

    // 设置Mapper类的输出类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Flow.class);

    // 设置Reducer类的输出类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    // 设置输入路径
    FileInputFormat.addInputPath(job,
    new Path("hdfs://hadoop:9000/txt/flow.txt"));
    // 设置输出路径 - 要求输出路径必须不存在
    FileOutputFormat.setOutputPath(job,
    new Path("hdfs://hadoop:9000/result/serial_flow"));

    // 提交任务
    job.waitForCompletion(true);
    }
    }

    hadoop是之前在hosts中的ip映射,对应虚拟机ip

运行之后会在hdfs的/result目录下生成对应的结果:


获取每一个人的平均成绩

数据(score.txt):

1
2
3
4
5
6
7
8
9
Bob 90 64 92 83 82 95
Alex 64 63 68 86 84 81
Grace 57 86 24 84 92
Henry 39 79 78 76 84 87 90
Adair 88 82 64 95
Chad 66 74 37 78 80
Colin 64 86 74 74 76
Eden 71 85 43 85 71
Grover 99 86 43 89

上传到hdfs的/txt文件夹下

  1. 数据存储类:包括各科目成绩

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    package com.quosimodo.average_score;

    import org.apache.hadoop.io.Writable;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;

    public class Score implements Writable {

    private List<Integer> scores;

    public List<Integer> getScores() {
    return scores;
    }

    public void setScores(List<Integer> scores) {
    this.scores = scores;
    }

    // 序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
    out.writeInt(scores.size());
    for (Integer score : scores) {
    out.writeInt(score);
    }
    }

    // 反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
    int len = in.readInt();
    this.scores = new ArrayList<>(len);
    for (int i=0; i<len; i++) {
    this.scores.add(in.readInt());
    }
    }
    }

  2. Mapper类的实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    package com.quosimodo.average_score;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;

    public class SerialScoreMapper
    extends Mapper<LongWritable, Text, Text, Score> {
    // key:输入的键,指的是行的字节偏移量
    // value:输入的值,指的是要处理的一行数据
    // context:环境参数,用于写出结果
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    // Bob 90 64 92 83 82 95
    // 拆分字段
    String[] arr = value.toString().split(" ");
    // 封装流量信息
    Score s = new Score();
    List<Integer> scores = new ArrayList<Integer>(arr.length - 1);

    for (int i = 1; i < arr.length; i++) {
    scores.add(Integer.parseInt(arr[i]));
    }
    s.setScores(scores);
    // 写出数据
    context.write(new Text(arr[0]), s);
    }

    }

  1. Reducer类实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    package com.quosimodo.average_score;

    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    import java.io.IOException;
    import java.text.DecimalFormat;
    import java.util.List;

    // 输入的键值类型,键值类型和Mapper的输出是一致的
    public class SerialScoreReducer extends Reducer<Text, Score, Text, Text> {
    // key:Mapper输出的键
    // values:将Mapper输出的值放到了迭代器中
    @Override
    protected void reduce(Text key, Iterable<Score> values, Context context) throws IOException, InterruptedException {
    // 记录分数
    long sum = 0;
    int count = 0;
    for (Score value : values) {
    List<Integer> scores = value.getScores();
    for (Integer score : scores ) {
    sum += score;
    count++;
    }
    }
    Long avg = sum/ count;
    // 保留小数位
    DecimalFormat df = new DecimalFormat("0.00");
    String str = df.format(avg);
    context.write(key, new Text(str));
    }
    }

  1. Driver类实现:配置启动类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    package com.quosimodo.average_score;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;

    public class SerialScoreDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

    // 获取环境配置
    Configuration conf = new Configuration();
    // 获取任务对象
    Job job = Job.getInstance(conf);

    // 设置入口类
    job.setJarByClass(SerialScoreDriver.class);
    // 设置Mapper
    job.setMapperClass(SerialScoreMapper.class);
    // 设置Reducer
    job.setReducerClass(SerialScoreReducer.class);

    // 设置Mapper类的输出类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Score.class);

    // 设置Reducer类的输出类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    // 设置输入路径
    FileInputFormat.addInputPath(job,
    new Path("hdfs://hadoop:9000/txt/score.txt"));
    // 设置输出路径 - 要求输出路径必须不存在
    FileOutputFormat.setOutputPath(job,
    new Path("hdfs://hadoop:9000/result/average_score"));

    // 提交任务
    job.waitForCompletion(true);
    }

    }

    hadoop是之前在hosts中的ip映射,对应虚拟机ip

运行之后会在hdfs的/result目录下生成对应的结果:


分区

按地区分别统计每一个人花费的总流量

数据(flow.txt):序列化题目中有数据

上传到hdfs的/txt/score文件夹下

  1. 数据存储类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    package com.quosimodo.partflow;

    import org.apache.hadoop.io.Writable;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    public class Flow implements Writable {

    private String city = "";
    private int upFlow;
    private int downFlow;

    public String getCity() {
    return city;
    }

    public void setCity(String city) {
    this.city = city;
    }

    public int getUpFlow() {
    return upFlow;
    }

    public void setUpFlow(int upFlow) {
    this.upFlow = upFlow;
    }

    public int getDownFlow() {
    return downFlow;
    }

    public void setDownFlow(int downFlow) {
    this.downFlow = downFlow;
    }

    @Override
    public void write(DataOutput out) throws IOException {
    out.writeUTF(city);
    out.writeInt(upFlow);
    out.writeInt(downFlow);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
    this.city = in.readUTF();
    this.upFlow = in.readInt();
    this.downFlow = in.readInt();
    }
    }

  2. 分区类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.quosimodo.partflow;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

// 按照地区/城市来对数据分类
public class CityPartitioner extends Partitioner<Text, Flow> {
@Override
public int getPartition(Text text, Flow flow, int numPartitions) {
// 获取地区
String city = flow.getCity();
// 根据地址的值来分类
if (city.equals("beijing")) return 0;
else if (city.equals("shanghai")) return 1;
else return 2;
}
}

  1. Mapper类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    package com.quosimodo.partflow;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    import java.io.IOException;

    public class PartFlowMapper extends Mapper<LongWritable, Text, Text, Flow> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    // 15432697314 hangzhou jack 4558 7474
    // 拆分字段
    String[] arr = value.toString().split(" ");
    // 封装Flow对象
    Flow f = new Flow();
    f.setCity(arr[1]);
    f.setUpFlow(Integer.parseInt(arr[3]));
    f.setDownFlow(Integer.parseInt(arr[4]));
    // 写出
    context.write(new Text(arr[2]), f);
    }
    }

  1. Reducer类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    package com.quosimodo.partflow;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    import java.io.IOException;

    public class PartFlowReducer extends Reducer<Text, Flow, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<Flow> values, Context context) throws IOException, InterruptedException {
    int sum = 0;
    // 计算总流量
    for (Flow value : values) {
    sum += value.getUpFlow() + value.getDownFlow();
    }
    context.write(key, new IntWritable(sum));
    }
    }

  1. Driver类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    package com.quosimodo.partflow;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;

    public class PartFlowDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);

    job.setJarByClass(PartFlowDriver.class);
    job.setMapperClass(PartFlowMapper.class);
    job.setReducerClass(PartFlowReducer.class);

    // 指定Partitioner类
    job.setPartitionerClass(CityPartitioner.class);
    // 指定ReduceTask的个数
    job.setNumReduceTasks(3);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Flow.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job,
    new Path("hdfs://hadoop:9000/txt/flow.txt"));
    FileOutputFormat.setOutputPath(job,
    new Path("hdfs://hadoop:9000/result/part_flow"));

    // 提交任务
    job.waitForCompletion(true);
    }

    }

运行之后会在hdfs的/result目录下生成对应的文件


按月份统计每一个人的总成绩

数据(目录:score):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# Chinese.txt
1 zhang 89
2 zhang 73
3 zhang 67
1 wang 49
2 wang 83
3 wang 27
1 li 77
2 li 66
3 li 89

# english.txt
1 zhang 55
2 zhang 69
3 zhang 75
1 wang 44
2 wang 64
3 wang 86
1 li 76
2 li 84
3 li 93

# math.txt
1 zhang 85
2 zhang 59
3 zhang 95
1 wang 74
2 wang 67
3 wang 96
1 li 45
2 li 76
3 li 67

上传到hdfs的/txt/score文件夹下

  1. 数据存储类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    package com.quosimodo.partscore;
    import org.apache.hadoop.io.Writable;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    public class Score implements Writable {

    private int month;
    private int score;

    public int getMonth() {
    return month;
    }

    public void setMonth(int month) {
    this.month = month;
    }

    public int getScore() {
    return score;
    }

    public void setScore(int score) {
    this.score = score;
    }

    @Override
    public void write(DataOutput out) throws IOException {
    out.writeInt(month);
    out.writeInt(score);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
    this.month = in.readInt();
    this.score = in.readInt();
    }

    }

  2. 分区类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.quosimodo.partscore;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class MonthPartitioner extends Partitioner<Text, Score> {
@Override
public int getPartition(Text text, Score score, int numPartitions) {
// 获取月份
int month = score.getMonth();
// 分区号
return month - 1;
}
}

  1. Mapper类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    package com.quosimodo.partscore;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    import java.io.IOException;

    public class PartScoreMapper extends Mapper<LongWritable, Text, Text, Score> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    // 1 zhang 89
    String[] arr = value.toString().split(" ");
    // 封装Score对象
    Score s = new Score();
    s.setMonth(Integer.parseInt(arr[0]));
    s.setScore(Integer.parseInt(arr[2]));
    // 写出数据
    context.write(new Text(arr[1]), s);
    }
    }

  1. Reducer类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    package com.quosimodo.partscore;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    import java.io.IOException;

    public class PartScoreReducer extends Reducer<Text, Score, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<Score> values, Context context) throws IOException, InterruptedException {
    int sum = 0;
    for (Score value : values) {
    sum += value.getScore();
    }
    context.write(key, new IntWritable(sum));
    }
    }

  1. Driver类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    package com.quosimodo.partscore;

    import com.quosimodo.partflow.*;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;

    public class PartScoreDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);

    job.setJarByClass(PartScoreDriver.class);
    job.setMapperClass(PartScoreMapper.class);
    job.setReducerClass(PartScoreReducer.class);

    job.setPartitionerClass(MonthPartitioner.class);
    job.setNumReduceTasks(3);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Score.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job,
    new Path("hdfs://hadoop:9000/txt/score/"));
    FileOutputFormat.setOutputPath(job,
    new Path("hdfs://hadoop:9000/result/part_score"));

    // 提交任务
    job.waitForCompletion(true);
    }

    }

运行之后会在hdfs的/result目录下生成对应的文件


排序

对之前的总流量来进行降序排序

数据(flow.txt):序列化题目中有数据

上传到hdfs的/txt文件夹下

  1. 数据存储类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    package com.quosimodo.sortflow;

    import org.apache.hadoop.io.WritableComparable;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    public class Flow implements WritableComparable<Flow> {

    private String name = "";
    private int totalFlow;

    public String getName() {
    return name;
    }

    public void setName(String name) {
    this.name = name;
    }

    public int getTotalFlow() {
    return totalFlow;
    }

    public void setTotalFlow(int totalFlow) {
    this.totalFlow = totalFlow;
    }

    // 按照每个人花费的总流量进行降序排序
    @Override
    public int compareTo(Flow o) {
    return o.totalFlow - this.totalFlow;
    }

    @Override
    public void write(DataOutput out) throws IOException {
    out.writeUTF(name);
    out.writeInt(totalFlow);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
    this.name = in.readUTF();
    this.totalFlow = in.readInt();
    }
    }

  2. Mapper类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    package com.quosimodo.sortflow;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    import java.io.IOException;

    public class SortFlowMapper extends Mapper<LongWritable, Text, Flow, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    // adair 33129
    // 拆分字段
    String[] arr = value.toString().split("\t");
    // 封装Flow对象
    Flow f = new Flow();
    f.setName(arr[0]);
    f.setTotalFlow(Integer.parseInt(arr[1]));
    // 写出数据
    context.write(f, NullWritable.get());
    }
    }

  1. Reducer类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    package com.quosimodo.sortflow;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    import java.io.IOException;

    public class SortFlowReducer extends Reducer<Flow, NullWritable, Text, IntWritable> {
    @Override
    protected void reduce(Flow key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
    context.write(new Text(key.getName()), new IntWritable(key.getTotalFlow()));
    }
    }

  1. Driver类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    package com.quosimodo.sortflow;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;

    public class SortFlowDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);

    job.setJarByClass(SortFlowDriver.class);
    job.setMapperClass(SortFlowMapper.class);
    job.setReducerClass(SortFlowReducer.class);

    job.setMapOutputKeyClass(Flow.class);
    job.setMapOutputValueClass(NullWritable.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job,
    new Path("hdfs://hadoop:9000/result/serial_flow"));
    FileOutputFormat.setOutputPath(job,
    new Path("hdfs://hadoop:9000/result/sort_flow"));

    // 提交任务
    job.waitForCompletion(true);
    }

    }

    hadoop是之前在hosts中的ip映射,对应虚拟机ip

运行之后会在hdfs的/result目录下生成对应的结果:


按照月份升序排序,如果是同一个月,则按照利润来降序排序

数据(profit.txt):

1
2
3
4
5
6
7
8
9
2 rose 345
1 rose 235
1 tom 234
2 jim 572
3 rose 123
1 jim 321
2 tom 573
3 jim 876
3 tom 648

上传到hdfs的/txt文件夹下

  1. 数据存储类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    package com.quosimodo.sortprofit;

    import org.apache.hadoop.io.WritableComparable;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    public class Profit implements WritableComparable<Profit> {

    private int month;
    private String name = "";
    private int profit;

    public int getMonth() {
    return month;
    }

    public void setMonth(int month) {
    this.month = month;
    }

    public String getName() {
    return name;
    }

    public void setName(String name) {
    this.name = name;
    }

    public int getProfit() {
    return profit;
    }

    public void setProfit(int profit) {
    this.profit = profit;
    }

    @Override
    public int compareTo(Profit o) {
    // 先按照月份来排序
    int r = this.getMonth() - o.getMonth();
    // 如果是同一个月则按照利润来排序
    if (r == 0)
    return o.getProfit() - this.getProfit();
    return r;
    }

    @Override
    public void write(DataOutput out) throws IOException {
    out.writeInt(month);
    out.writeUTF(name);
    out.writeInt(profit);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
    this.month = in.readInt();
    this.name = in.readUTF();
    this.profit = in.readInt();
    }

    @Override
    public String toString() {
    return "Profit{" +
    "month=" + month +
    ", name='" + name + '\'' +
    ", profit=" + profit +
    '}';
    }
    }

  2. Mapper类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    package com.quosimodo.sortprofit;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    import java.io.IOException;

    public class SortProfitMapper extends Mapper<LongWritable, Text, Profit, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    // 2 rose 345
    String[] arr = value.toString().split(" ");
    // 封装对象
    Profit p = new Profit();
    p.setMonth(Integer.parseInt(arr[0]));
    p.setName(arr[1]);
    p.setProfit(Integer.parseInt(arr[2]));
    context.write(p, NullWritable.get());
    }
    }

  1. Reducer类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    package com.quosimodo.sortprofit;

    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Reducer;

    import java.io.IOException;

    public class SortProfitReducer extends Reducer<Profit, NullWritable, Profit, NullWritable> {
    @Override
    protected void reduce(Profit key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
    context.write(key, NullWritable.get());
    }
    }

  1. Driver类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    package com.quosimodo.sortprofit;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;

    public class SortProfitDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);

    job.setJarByClass(SortProfitDriver.class);
    job.setMapperClass(SortProfitMapper.class);
    job.setReducerClass(SortProfitReducer.class);

    job.setOutputKeyClass(Profit.class);
    job.setOutputValueClass(NullWritable.class);

    FileInputFormat.addInputPath(job,
    new Path("hdfs://hadoop:9000/txt/profit.txt"));
    FileOutputFormat.setOutputPath(job,
    new Path("hdfs://hadoop:9000/result/sort_profit"));

    // 提交任务
    job.waitForCompletion(true);
    }

    }

    hadoop是之前在hosts中的ip映射,对应虚拟机ip

运行之后会在hdfs的/result目录下生成对应的结果:


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!