网络上关于flume的文档很多,因此本文主要作用是入门和介绍一些常用场景,并且也会将一些好的文档推荐给大家~
可以参考这篇文档,本文不再赘述。
Flume NG 1.x 是Flume 0.9.x的重构版本。在Flume NG中没有了之前的Master、Zookeeper、Collector以及Web console。只有source、sink、channel。
安装JDK(省略)
去官网提供的下载地址下载flume的二进制包
解压:
tar zxvf apache-flume-1.7.0-bin.tar.gz
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ cd apache-flume-1.7.0-bin/ [vagrant@hadoop-102 apache-flume-1.7.0-bin]$ vim conf/flume-conf.properties [vagrant@hadoop-102 apache-flume-1.7.0-bin]$ cat conf/flume-conf.properties # Agent tier1 tier1.sources = r1 tier1.sinks = k1 tier1.channels = c1 # source 配置 tier1.sources.r1.type = exec tier1.sources.r1.command = tail -F /home/vagrant/flume-test/flume-test.log # sink 配置 tier1.sinks.k1.type = file_roll tier1.sinks.k1.sink.directory = /home/vagrant/flume-test/collector tier1.sinks.k1.sink.rollInterval=3600 # channel 配置 tier1.channels.c1.type = file tier1.channels.c1.checkpointDir = /home/vagrant/flume-test/checkpointdir tier1.channels.c1.dataDirs = /home/vagrant/flume-test/datadir # 绑定source、sink到channel上 tier1.sources.r1.channels = c1 tier1.sinks.k1.channel = c1
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ mkdir -p /home/vagrant/flume-test/ [vagrant@hadoop-102 apache-flume-1.7.0-bin]$ mkdir -p /home/vagrant/flume-test/collector [vagrant@hadoop-102 apache-flume-1.7.0-bin]$ mkdir -p /home/vagrant/flume-test/checkpointdir [vagrant@hadoop-102 apache-flume-1.7.0-bin]$ mkdir -p /home/vagrant/flume-test/datadir [vagrant@hadoop-102 apache-flume-1.7.0-bin]$ touch /home/vagrant/flume-test/flume-test.log
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name tier1 -Dflume.root.logger=INFO,console
其中,--name
用来指定agent的名称
[vagrant@hadoop-102 ~]$ cd /home/vagrant/flume-test/ [vagrant@hadoop-102 ~]$ echo testlog >>flume-test.log [vagrant@hadoop-102 flume-test]$ ls collector/ 1505963145080-1 [vagrant@hadoop-102 flume-test]$ cat collector/1505963145080-1 testlog
可以看到日志已经被收集了
flume支持很多source、channel、sink,其中就支持把kafka作为sink。因此,只需要简单配置即可,下面看一个示例:
# Agent tier1 tier1.sources = r1 tier1.sinks = k1 tier1.channels = c1 # source 配置 tier1.sources.r1.type = exec tier1.sources.r1.command = tail -F /home/vagrant/flume-test/flume-test.log # sink 配置 tier1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink tier1.sinks.k1.kafka.bootstrap.servers = 192.168.100.101:9092,192.168.100.102:9092 tier1.sinks.k1.kafka.topic = testflume # channel 配置 tier1.channels.c1.type = file tier1.channels.c1.checkpointDir = /home/vagrant/flume-test/checkpointdir tier1.channels.c1.dataDirs = /home/vagrant/flume-test/datadir # 绑定source、sink到channel上 tier1.sources.r1.channels = c1 tier1.sinks.k1.channel = c1
关于kafka、zookeeper的安装,可以参考:
可以先阅读这篇文档。
下面看个例子:
SearchInterceptor.java:
package com.gomeplus.bigdata.log; import lombok.extern.slf4j.Slf4j; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.io.UnsupportedEncodingException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.List; @Slf4j public class SearchInterceptor implements Interceptor { private String hostName; private String charset; public static class Builder implements Interceptor.Builder { private String hostName = "localhost"; private String charset = "utf8"; { try { InetAddress address = InetAddress.getLocalHost(); hostName = address.getHostName(); } catch (UnknownHostException ex) { log.error(ex.getMessage()); } } public Interceptor build() { return new SearchInterceptor(hostName, charset); } public void configure(Context context) { hostName = context.getString("hostname", hostName); charset = context.getString("charset", charset); } } private SearchInterceptor(String hostName, String charset) { this.hostName = hostName; this.charset = charset; } public Event intercept(Event event) { String body; try { body = new String(event.getBody(), charset); event.setBody((body + "\t" + hostName).getBytes(charset)); } catch (UnsupportedEncodingException ex) { log.error(ex.getMessage()); return event; } return event; } public Listintercept(List events) { for (Event event: events) intercept(event); return events; } public void initialize() { // NO-OP } public void close() { //NO-OP } }
pom.xml:
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core --> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.7.0</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.18</version> <scope>provided</scope> </dependency> </dependencies>
flume-conf.properties:
# 拦截器 tier1.sources.r1.interceptors = i1 tier1.sources.r1.interceptors.i1.type = com.gomeplus.bigdata.log.SearchInterceptor$Builder tier1.sources.r1.interceptors.i1.hostname = test_host_name tier1.sources.r1.interceptors.i1.charset = UTF8
打好jar包之后,将jar包拷贝到<flume安装目录>/lib/
目录下,然后启动flume agent即可。
翻译自:flume官网。
flume支持从一个 source 扇出到多个 channel。flume自带的扇出策略有两种:replicating(复制,默认值) 和 multiplexing(多路复用)。
在使用replicating策略时,event会被发送到所有配置给source的channel;在使用multiplexing策略时,event会被按需发送到一部分channel中。
为了达到扇出的目的,需要给source配置多个channel,同时还需要使用selector.type
指定扇出策略。下面是一个例子:
# List the sources, sinks and channels for the agent <Agent>.sources = <Source1> <Agent>.sinks = <Sink1> <Sink2> <Agent>.channels = <Channel1> <Channel2> # set list of channels for source (separated by space) <Agent>.sources.<Source1>.channels = <Channel1> <Channel2> # set channel for sinks <Agent>.sinks.<Sink1>.channel = <Channel1> <Agent>.sinks.<Sink2>.channel = <Channel2> <Agent>.sources.<Source1>.selector.type = replicating
当使用multiplexing策略时,可以根据event的某一个header,来选择channel。 当该header匹配上某一个值的时候,就会将该event发送到配置的channels中。如果该header没匹配到任何值,那么会将event发送到default channels中去。比如:
# Mapping for multiplexing selector <Agent>.sources.<Source1>.selector.type = multiplexing <Agent>.sources.<Source1>.selector.header = <someHeader> <Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1> <Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2> <Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2> #... <Agent>.sources.<Source1>.selector.default = <Channel2>
在下面的例子中,有一个叫agent_foo
的agent,它只有一个avro类型的source,但是有两个channel,每个channel都连接到一个sink。selector会检查event的State
header,如果它的值是CA
,那么event会被发送到mem-channel-1
;如果它的值是AZ
,那么event会被发送到file-channel-2
;如果它的值是NY
,那么event会被发送到mem-channel-1
和 mem-channel-2
;否则,event会被发送到mem-channel-1
:
# list the sources, sinks and channels in the agent agent_foo.sources = avro-AppSrv-source1 agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2 agent_foo.channels = mem-channel-1 file-channel-2 # set channels for source agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2 # set channel for sinks agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1 agent_foo.sinks.avro-forward-sink2.channel = file-channel-2 # channel selector configuration agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing agent_foo.sources.avro-AppSrv-source1.selector.header = State agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1 agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
selector会尝试把event写到所有required channels中,哪怕其中一个channel消费event失败,都会导致这个事务失败。然后,selector会在所有的channel上重试。在所有required channels都消费了event之后,selector会尝试把event写到所有optional channels中,当任何optional channel消费event失败时,都会被忽略,而不会重试。下面看一个带有optional channel的例子:
# channel selector configuration agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing agent_foo.sources.avro-AppSrv-source1.selector.header = State agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1 agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
如果required channels和optional channels有重叠,那么认为这些channel是required。
当没有required channels时,selector会把event写进default channels,并尝试把event写进optional channels;
当既没有required channels,又没有default channels时,selector会尝试把event写进optional channels。