项目实训第五天

本文最后更新于:2021年7月23日 晚上


aFlume

Sink

Avro Sink

  1. AVRO Sink将数据利用AVRO序列化之后写出,结合AVRO Source可以实现多级、扇入和扇出流动

  2. 多级流动

    1. 第一个节点配置

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1

      a1.sources.s1.type = netcat
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 8090

      a1.channels.c1.type = memory

      a1.sinks.k1.type = avro
      a1.sinks.k1.hostname = hadoop02
      a1.sinks.k1.port = 8090

      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
    2. 第二个节点配置

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1

      a1.sources.s1.type = avro
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 8090

      a1.channels.c1.type = memory

      a1.sinks.k1.type = avro
      a1.sinks.k1.hostname = hadoop03
      a1.sinks.k1.port = 8090

      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
    3. 第三个节点配置

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1

      a1.sources.s1.type = avro
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 8090

      a1.channels.c1.type = memory

      a1.sinks.k1.type = logger

      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
    4. 启动命令

      1
      flume-ng agent -n a1 -c $FLUME_HOME/conf -f duoji.conf -Dflume.root.logger=INFO,console
  3. 扇入流动

    1. 第一个和第二节点的配置

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1

      a1.sources.s1.type = netcat
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 8090

      a1.channels.c1.type = memory

      a1.sinks.k1.type = avro
      a1.sinks.k1.hostname = hadoop03
      a1.sinks.k1.port = 8090

      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
    2. 第三个节点的配置

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1

      a1.sources.s1.type = avro
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 8090

      a1.channels.c1.type = memory

      a1.sinks.k1.type = logger

      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
  4. 扇出流动

    1. 第一个节点的配置

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      a1.sources = s1
      a1.channels = c1 c2
      a1.sinks = k1 k2

      a1.sources.s1.type = netcat
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 8090

      a1.channels.c1.type = memory

      a1.channels.c2.type = memory

      a1.sinks.k1.type = avro
      a1.sinks.k1.hostname = hadoop02
      a1.sinks.k1.port = 8090

      a1.sinks.k2.type = avro
      a1.sinks.k2.hostname = hadoop03
      a1.sinks.k2.port = 8090

      a1.sources.s1.channels = c1 c2
      a1.sinks.k1.channel = c1
      a1.sinks.k2.channel = c2
    2. 第二个和第三个节点的配置

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1

      a1.sources.s1.type = avro
      a1.sources.s1.bind = 0.0.0.0
      a1.sources.s1.port = 8090

      a1.channels.c1.type = memory

      a1.sinks.k1.type = logger

      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1

Custom Sink

  1. 如果需要自定义Sink,那么需要考虑定义一个类实现Sink接口,同时需要考虑让这个类实现Configurable接口

  2. 在定义Sink的时候,需要注意Flume中的事务问题

  3. 配置

    1. 格式文件

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1

      a1.sources.s1.type = http
      a1.sources.s1.port = 8090

      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 10000
      a1.channels.c1.transactionCapacity = 100

      # 配置自定义Sink - 需要指定类的全路径名
      a1.sinks.k1.type = cn.tedu.flume.sink.AuthSink
      a1.sinks.k1.path = /opt/flumedata

      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
    2. 启动命令

      1
      flume-ng agent -n a1 -c $FLUME_HOME/conf -f authsink.conf -Dflume.root.logger=INFO,console
    3. 发送POST请求

      1
      curl -X POST -d '[{"headers":{"class":"big data"},"body":"大数据实训"}]' http://hadoop:8090

    例子

    Flume->sink(Package)->AuthSink.java:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    package cn.tedu.flume.sink;

    import org.apache.flume.*;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;

    import java.io.FileNotFoundException;
    import java.io.PrintStream;
    import java.nio.charset.StandardCharsets;
    import java.util.Map;

    // 模拟:File Roll Sink,将数据写到指定的目录下
    public class AuthSink extends AbstractSink implements Sink, Configurable {

    private String path;
    private PrintStream ps;

    // 获取属性值
    @Override
    public void configure(Context context) {
    // 获取用户指定的存储路径
    path = context.getString("path");
    // 判断用户是否指定了存储路径
    if (path == null)
    throw new IllegalArgumentException("没有指定path属性!!!");
    }

    // 启动Sink
    @Override
    public synchronized void start() {
    // 初始化流
    try {
    ps = new PrintStream(path + "/" + System.currentTimeMillis());
    } catch (FileNotFoundException e) {
    e.printStackTrace();
    }
    }

    @Override
    public Status process() {
    // 获取Channel
    Channel c = this.getChannel();
    // 获取事务
    Transaction t = c.getTransaction();
    // 开启事务
    t.begin();
    // 构建Event来存储数据
    Event e;
    try {
    // 从Channel中来获取数据
    while ((e = c.take()) != null) {
    // 写出headers部分
    ps.println("headers:");
    Map<String, String> headers = e.getHeaders();
    for (Map.Entry<String, String> header : headers.entrySet()) {
    ps.println("\t" + header.getKey() + "=" + header.getValue());
    }
    // 写出body部分
    ps.println("body:");
    byte[] body = e.getBody();
    ps.println("\t" + new String(body, StandardCharsets.UTF_8));
    }
    // 提交事务
    t.commit();
    return Status.READY;
    } catch (Exception ex) {
    ex.printStackTrace();
    // 事务回滚
    t.rollback();
    return Status.BACKOFF;
    } finally {
    // 关闭事务
    t.close();
    }
    }

    // 关闭Sink
    @Override
    public synchronized void stop() {
    if (ps != null)
    ps.close();
    }
    }

    Flume->source(Package)->AuthSource.java:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    package cn.tedu.flume.source;

    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.EventDrivenSource;
    import org.apache.flume.channel.ChannelProcessor;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.event.EventBuilder;
    import org.apache.flume.source.AbstractSource;

    import java.nio.charset.StandardCharsets;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    // 模拟:Sequence Generator Source
    public class AuthSource extends AbstractSource implements EventDrivenSource, Configurable {

    private long step; // 步长
    private long end; // 终止范围
    private ExecutorService es; // 线程池

    // 获取指定属性值
    @Override
    public void configure(Context context) {
    // 如果用户指定了步长,就按照指定步长来递增;如果用户没有指定,那么步长默认为1
    step = context.getLong("step", 1L);
    // 如果用户指定了范围,就递增到指定的范围;如果用户没有指定,那么范围就是Long.MAX_VALUE
    end = context.getLong("end", Long.MAX_VALUE);
    }

    // 启动Source
    @Override
    public synchronized void start() {
    // 初始化线程池
    es = Executors.newFixedThreadPool(5);
    // 获取通道处理器用于处理数据
    ChannelProcessor cp = this.getChannelProcessor();
    // 提交任务
    es.submit(new Add(step, end, cp));
    }

    @Override
    public synchronized void stop() {
    if (es != null)
    es.shutdown();
    }

    }

    class Add implements Runnable {

    private final long step;
    private final long end;
    private final ChannelProcessor cp;

    public Add(long step, long end, ChannelProcessor cp) {
    this.step = step;
    this.end = end;
    this.cp = cp;
    }

    @Override
    public void run() {
    for (long i = 0; i < end; i += step) {
    // 构建headers
    Map<String, String> headers = new HashMap<>();
    headers.put("time", System.currentTimeMillis() + "");
    // 构建body
    byte[] body = (i + "").getBytes(StandardCharsets.UTF_8);
    // 需要将数据封装成Event对象
    Event e = EventBuilder.withBody(body, headers);
    // 需要将封装好的Event传递给Channel
    cp.processEvent(e);
    }
    }
    }


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!