0%

Flume使用、优化及扩展

Flume

Flume是一个开箱即用的数据传输组件,可提供分布式、可用、可靠的数据收集、聚合、移动服务,具有伸缩的健壮性、容错性,failover及数据恢复机制。Flume使用Event数据模型在集群内多个Flume节点间进行数据流转,支持在线的数据分析应用。

Flume模型

Flume的实例称为agent,是集群内一个完整的节点,每个agent包含三个组件:Source,Channel,Sink,数据被封装成Event在多个agent间顺序流经Source、Channel、Sink,这便是Flume的模型结构。一个agent内,Source与Sink为单独线程,通过Channel独立的进行数据的写入和提取。

  • Source使用特定协议从上游数据源接收Event,流入agent,写进Channel;
  • Channel缓存Source流入的Event,被Sink处理。Channel是一个数据换从队列,有多种存储方案,能缓冲流出失败的数据(可靠性),使用磁盘或外部存储方案的Channel能使节点从故障中进行恢复(故障恢复);
  • Sink提取Channel中的Event,使Event流出agent,流向外部存储或下游数据节点,Sink使用事务进行Event递交,Event递交成功后才会从Channel中移除(可靠性)。

Flume模型允许使用者构建多节点的数据流,生成复杂的有向无环的拓扑结构。

Flume使用

Flume具有开箱即用的特性,以当前1.8为例,只需

  1. 具有8及以上版本jre环境
  2. 磁盘文件访问权限
  3. 足够的内存空间(尤其使用Memory Channel时)
  4. 足够的磁盘空间(尤其使用File Channel时)
  5. 有效的Flume配置文件
    1
    flume-ng agent -n $agent_name -c conf -f $configfile

    Flume配置文件

    配置文件声明了agent内Source、Channel、Sink的配置信息,Event的流入流出方向,多个连通的agent的配置文件,描述了整体数据流的拓扑结构。

一个有效的Flume基本配置信息包含四个部分

组件声明

声明agent内各组件个数名称

1
2
3
a1.sources = r1 r2
a1.sinks = k1
a1.channels = c1

配置声明a1 agent中r1、r2两个Source,一个Sink k1,一个Channel c1。

Source配置

各Source的类型及其配置,使用的Source类型不同,配置参数也会不同

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
a1.sources.r1.type = thrift
a1.sources.r1.channels = c1
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sources.r2.type = TAILDIR
a1.sources.r2.channels = c1
a1.sources.r2.positionFile = /var/log/flume/taildir_position.json
a1.sources.r2.filegroups = f1 f2
a1.sources.r2.filegroups.f1 = /var/log/test1/example.log
a1.sources.r2.headers.f1.headerKey1 = value1
a1.sources.r2.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r2.headers.f2.headerKey1 = value2
a1.sources.r2.headers.f2.headerKey2 = value2-2
a1.sources.r2.fileHeader = true

对组件声明内的两个Source进行配置,r1为ThriftSource,在44444端口监听thrift协议的数据流入,数据缓存入c1 Channel;r2为TaildirSource,以本机指定的文件为数据输入,并监听文件的写入,文件的读取进度行号保存在positionFile中,r2也以c1为Channel。

Flume预定义的Source包含Avro,Thrift,Exec,JMS,Spooling,Taildir,Twitter,Kafka,NetCat,Sequence Generator,Syslog,Http等类型。Avro Source与Avro Sink常用于连接多个agent。

Channel配置

Channel的类型配置,存储空间参数配置等

1
2
3
4
5
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

对组件声明的c1进行配置,类型为MemoryChannel,及数据缓存在内存中,使用MemoryChannel时特别需要注意机器应具备足够的空闲内存,agent故障重启后,Channel中的数据会丢失,但是MemoryChannel是所有Channel中最高效的。capacity及byteCapacity确定了Channel使用的存储空间大小,transactionCapacity则指定Sink事务一次可提取的Event数量。

Flume预定义的Channel包含Memory,JDBC,Kafka,File,Spillable等类型。当一个Source被冗余写入多个Channel时,产生了数据流备份,而多个Sink写入一个Channel时,产生了数据流合并。

Sink配置

Sink的类型配置,Sink的流向配置等

1
2
3
4
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

对组件声明的k1进行配置,类型为AvroSink,数据流出到下游10.10.10.10:4545端口。

Flume预定义的Sink包含HDFS,Hive,Logger,Avro,Thrift,IRC,File Roll, Null,HBase,ElasticSearch,Kafka,HTTP等类型。多个Sink从Channel取数据时,根据下游数据流出的配置,如果Sink流出到不同的下游,则产生了数据流的分发。

额外配置

为了适应多样的数据流,还有一些额外配置

  • Flume Channel Selector,指明Event写入Channel的策略,默认为replicating,即冗余写入配置的Channel,从而实现了数据流的备份,Flume预定义的Selector还有multiplexing,可根据header值选择写入的Channel,用于数据分流;
  • File Sink Processor,将多个Sink绑定到一起,指定Sink对Event的处理行为,默认Default Sink Processor只接受一个Sink,还可选择Failover Sink Processor,言即将失败的Event fail-over到同组其他Sink中,还可使用LoadBalance Sink Processor,来做分发的负载均衡;
  • Event Serializers,Event的序列化类;
  • Flume Interceptors,用于Source接受Event后对Event的处理,如数据清洗,打标签,去标签等。

Flume调优

分布式系统中的调优,有集群优化及节点性能优化多种手段。

集群优化

以数据存取的速度排序来看,跨机架的数据流动性能优于跨机房数据流动,同节点上数据流动性能优于跨机架数据流动,最好是在进程内流动,也即是说,在设计Flume数据流拓扑结构的时候,除非必要,应:

  • 尽量减少数据流动次数,避免数据反复进出机架或机房;
  • 增加单节点对数据的处理深度,以减少数据流层次;
  • 水平扩展节点,提高集群吞吐。

节点优化

节点优化关注在agent的吞吐能力上。具体来讲,有以下几个方面的优化手段,同时我们还需要考虑到稳定性及数据安全性:

  1. 事务批量提交的Event数量。前面提到过,Sink使用事务进行数据向下游的递交,单次事务中提交的数据数量,是影响节点吞吐量的最主要原因。事务一次提交的Event数量越多,吞吐量越大,但同时也意味着,数据向下游流出的延迟越大,当发生事务回滚时,下游可能产生的重复记录就越多。Sink的配置中batch-size用于配置一次提交的Event数量(不同的Sink可能会有略微区别)。
    • Channel中transactionCapability参数(Sink的事务每次能从此Channel中提取的Event的数量的最大值)对单个事务提交的Event数量具有限制所用,batch-size不得大于transactionCapability。
    • 在一个数据流的有向无环图中,下游的数据节点的batch-size之应不小于上游数据节点的batch-size之和,否则,下游节点可能成为数据流通道流量的瓶颈,最好是二者相等,这样即平衡了上下游的吞吐能力,又减少了下游数据的延迟。
  2. Channel选择。Source和Sink在进行事务内数据的写入或提取时,都需要先打开通道,并完成数据的写入或提取后才可以允许事务确认,在上下游节点未发生阻塞的情况下,Channel的速度能决定数据在节点中的驻留时间,由于内存读写远超磁盘读写速度,更远超网络读写速度,因此,MemoryChannel在数据流动性能上要优于FileChannel,后者又优于Kafka和JDBCChannel。
    • 从安全性上来讲,MemoryChannel并不是一个好的选择,毕竟,在集群环境中,节点宕机还是相对常见的事情,故障发生后,节点Channel中由上游节点提交而尚未提交到下游节点的数据就丢失了。
  3. 并联。除了集群层次水平扩展节点外,Flume也允许在节点内进行并发线路的调整,通过增加Channel或Sink(增加数据流动并发能力)来提高agent吞吐,从安全性上考虑,尽量使用较小的batch-size,吞吐量上的不足,可以通过节点或节点内并发线路的增加来弥补。

如果使用了Serializer或Interceptor,也可能对集群的效率产生影响。

Flume开发

除了Flume开箱即用的一些组件,使用者还可通过实现模块的接口,扩展Flume的功能,只需将实现的模块jar文件丢到$FLUME_CLASSPATH目录下就能在配置文件中配置使用用户自定义模块。但是还是建议一个规范的文件结构管理非官方发布的组件:将组件文件放在$FLUME_HOME/plugins.d目录中,flume-ng命令会启动脚本在此目录中查找符合规范的模块,并添加到java的classpath路径中。plugins.d目录内模块文件子目录规范如下:

  1. lib - 实现模块的jar文件
  2. libext - 模块的依赖jar文件
  3. native - 模块依赖的native库,如.so文件

如:

1
2
3
4
5
6
7
plugins.d/ 
plugins.d/custom-source-1/
plugins.d/custom-source-1/lib/my-source.jar
plugins.d/custom-source-1/libext/spring-core-2.5.6.jar
plugins.d/custom-source-2/
plugins.d/custom-source-2/lib/custom.jar
plugins.d/custom-source-2/native/gettext.so

关于Flume模块接口的实现,后面再开一篇文进行介绍。