前言

网络上关于flume的文档很多,因此本文主要作用是入门和介绍一些常用场景,并且也会将一些好的文档推荐给大家~


flume的工作原理

可以参考这篇文档,本文不再赘述。
Flume NG 1.x 是Flume 0.9.x的重构版本。在Flume NG中没有了之前的Master、Zookeeper、Collector以及Web console。只有source、sink、channel。


安装flume

tar zxvf apache-flume-1.7.0-bin.tar.gz
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ cd apache-flume-1.7.0-bin/
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ vim conf/flume-conf.properties
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ cat conf/flume-conf.properties
# Agent tier1
tier1.sources = r1
tier1.sinks = k1
tier1.channels = c1

# source 配置
tier1.sources.r1.type = exec
tier1.sources.r1.command = tail -F /home/vagrant/flume-test/flume-test.log

# sink 配置
tier1.sinks.k1.type = file_roll
tier1.sinks.k1.sink.directory = /home/vagrant/flume-test/collector
tier1.sinks.k1.sink.rollInterval=3600

# channel 配置
tier1.channels.c1.type = file
tier1.channels.c1.checkpointDir = /home/vagrant/flume-test/checkpointdir
tier1.channels.c1.dataDirs =  /home/vagrant/flume-test/datadir

# 绑定source、sink到channel上
tier1.sources.r1.channels = c1
tier1.sinks.k1.channel = c1
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ mkdir -p /home/vagrant/flume-test/
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ mkdir -p /home/vagrant/flume-test/collector
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ mkdir -p /home/vagrant/flume-test/checkpointdir
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ mkdir -p /home/vagrant/flume-test/datadir
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ touch /home/vagrant/flume-test/flume-test.log
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name tier1 -Dflume.root.logger=INFO,console

其中,--name用来指定agent的名称

[vagrant@hadoop-102 ~]$ cd /home/vagrant/flume-test/
[vagrant@hadoop-102 ~]$ echo testlog >>flume-test.log
[vagrant@hadoop-102 flume-test]$ ls collector/
1505963145080-1
[vagrant@hadoop-102 flume-test]$ cat collector/1505963145080-1
testlog

可以看到日志已经被收集了


进阶 - 整合flume和kafka

flume支持很多source、channel、sink,其中就支持把kafka作为sink。因此,只需要简单配置即可,下面看一个示例:

# Agent tier1
tier1.sources = r1
tier1.sinks = k1
tier1.channels = c1

# source 配置
tier1.sources.r1.type = exec
tier1.sources.r1.command = tail -F /home/vagrant/flume-test/flume-test.log

# sink 配置
tier1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.k1.kafka.bootstrap.servers = 192.168.100.101:9092,192.168.100.102:9092
tier1.sinks.k1.kafka.topic = testflume

# channel 配置
tier1.channels.c1.type = file
tier1.channels.c1.checkpointDir = /home/vagrant/flume-test/checkpointdir
tier1.channels.c1.dataDirs =  /home/vagrant/flume-test/datadir

# 绑定source、sink到channel上
tier1.sources.r1.channels = c1
tier1.sinks.k1.channel = c1

关于kafka、zookeeper的安装,可以参考:


自定义Interceptor

可以先阅读这篇文档

下面看个例子:
SearchInterceptor.java:

package com.gomeplus.bigdata.log;

import lombok.extern.slf4j.Slf4j;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;

@Slf4j
public class SearchInterceptor implements Interceptor {
    private String hostName;
    private String charset;

    public static class Builder implements Interceptor.Builder {
        private String hostName = "localhost";
        private String charset = "utf8";

        {
            try {
                InetAddress address = InetAddress.getLocalHost();
                hostName = address.getHostName();
            } catch (UnknownHostException ex) {
                log.error(ex.getMessage());
            }
        }

        public Interceptor build() {
            return new SearchInterceptor(hostName, charset);
        }

        public void configure(Context context) {
            hostName = context.getString("hostname", hostName);
            charset = context.getString("charset", charset);
        }
    }

    private SearchInterceptor(String hostName, String charset) {
        this.hostName = hostName;
        this.charset = charset;
    }

    public Event intercept(Event event) {
        String body;
        try {
            body = new String(event.getBody(), charset);
            event.setBody((body + "\t" + hostName).getBytes(charset));
        } catch (UnsupportedEncodingException ex) {
            log.error(ex.getMessage());
            return event;
        }
        return event;
    }

    public List intercept(List events) {
        for (Event event: events)
            intercept(event);
        return events;
    }

    public void initialize() {
        // NO-OP
    }

    public void close() {
        //NO-OP
    }
}

pom.xml:

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.7.0</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

flume-conf.properties:

# 拦截器
tier1.sources.r1.interceptors = i1
tier1.sources.r1.interceptors.i1.type = com.gomeplus.bigdata.log.SearchInterceptor$Builder
tier1.sources.r1.interceptors.i1.hostname = test_host_name
tier1.sources.r1.interceptors.i1.charset = UTF8

打好jar包之后,将jar包拷贝到<flume安装目录>/lib/目录下,然后启动flume agent即可。


扇出

翻译自:flume官网

flume支持从一个 source 扇出到多个 channel。flume自带的扇出策略有两种:replicating(复制,默认值) 和 multiplexing(多路复用)。
在使用replicating策略时,event会被发送到所有配置给source的channel;在使用multiplexing策略时,event会被按需发送到一部分channel中。
为了达到扇出的目的,需要给source配置多个channel,同时还需要使用selector.type指定扇出策略。下面是一个例子:

# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>

# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>

<Agent>.sources.<Source1>.selector.type = replicating

当使用multiplexing策略时,可以根据event的某一个header,来选择channel。 当该header匹配上某一个值的时候,就会将该event发送到配置的channels中。如果该header没匹配到任何值,那么会将event发送到default channels中去。比如:

# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...

<Agent>.sources.<Source1>.selector.default = <Channel2>

在下面的例子中,有一个叫agent_foo的agent,它只有一个avro类型的source,但是有两个channel,每个channel都连接到一个sink。selector会检查event的State header,如果它的值是CA,那么event会被发送到mem-channel-1;如果它的值是AZ,那么event会被发送到file-channel-2;如果它的值是NY,那么event会被发送到mem-channel-1mem-channel-2;否则,event会被发送到mem-channel-1

# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2

# set channels for source
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2

# set channel for sinks
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2

# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

selector会尝试把event写到所有required channels中,哪怕其中一个channel消费event失败,都会导致这个事务失败。然后,selector会在所有的channel上重试。在所有required channels都消费了event之后,selector会尝试把event写到所有optional channels中,当任何optional channel消费event失败时,都会被忽略,而不会重试。下面看一个带有optional channel的例子:

# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

如果required channels和optional channels有重叠,那么认为这些channel是required。
当没有required channels时,selector会把event写进default channels,并尝试把event写进optional channels;
当既没有required channels,又没有default channels时,selector会尝试把event写进optional channels。