Interceptor 数据在Flume节点间流动时,是用Event模型封装的,Interceptor可以在Event流经节点的Source时,对Event进行操作,包括对Event进行过滤实现数据清洗,添加Event标记以实现特殊功能如数据分流等。
Interceptor配置在agent的Source中,按配置的顺序生效,Interceptor本身也可读取Flume的上下文配置信息。
随包发布的Interceptor Flume随包发布了若干个Interceptor,覆盖了常用的大多数场景。以下是一个简单的时间戳Iterceptor配置
1 2 3 a1.sources = r1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
流经TimestampInterceptor的Event会被打上timestamp头信息,在某些Sink中,会使用timestamp进行数据落地,如HDFSSink中,对数据按时间写入时,可能使用Event头部的timestamp,此时需要所有的数据流分路上至少有一个节点在Event头部写入了时间戳,以下是一个典型的HDFS Sink的配置
1 2 3 4 5 6 7 8 9 10 11 a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute # 默认为false,及不使用event头部的时间戳,而使用节点上写入时的时间戳,当数据有比较大的延迟时,可能使得较多数据落地到不一致的文件中。 a1.sinks.k1.hdfs.useLocalTimeStamp = false
Interceptor编写 在某些情况下,我们可能需要对Event进行一些自定义的操作,比较常见的是使用Flume进行数据流清洗。甚至有时候,我们可能对Event中封装的数据进行改写,比如,在对nginx日志进行采集时,需要将请求中的session转换成用户编号进行数据定锚,而session又具有一定的实效性,因此我们在数据流经的第一层节点上增加了一个自定义Interceptor,用以获取用户编号后,将其织入Event数据中(业务代码,暂不提供详细描述)。
下面的例子详细描述了实现一个Interceptor的过程
示例Interceptor功能简介 这里使用的例子依然与时间戳有关,在前面使用TimestampInterceptor时,我们举过一个例子,但是数据流入集群时的时间戳并不是数据真正产生的时间戳,最好是能从数据本身提取出时间信息,这种模式在常规的日志框架内都能得到有效支持,不过对于不同的框架、配置,数据中的时间模式可能不尽相同,需要通过节点中的配置文件来指明。
Interceptor接口要求 Inteceptor需要实现org.apache.flume.interceptor.Interceptor接口,实际配置到Flume配置文件中的是Interceptor的构造类,因此,还需要实现org.apache.flume.interceptor.Interceptor.Builder接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package org.apache.flume.interceptor; public interface Interceptor { public void initialize(); public Event intercept(Event event); public List<Event> intercept(List<Event> events); public void close(); public interface Builder extends Configurable { public Interceptor build(); } } package org.apache.flume.conf; public interface Configurable { public void configure(Context context); }
maven依赖为
1 2 3 4 5 <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>{flume_version}</version> </dependency>
实现代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 package jeff.flume.interceptor.timeExtractInterceptor; ... public class TimeExtractInterceptor implements Interceptor { private Builder builder; private ThreadLocal<SimpleDateFormat> format; private SimpleDateFormat getFormat() { SimpleDateFormat fmt = format.get(); if (null == fmt) { fmt = new SimpleDateFormat(this.builder.getTimeFormat()); format.set(fmt); } return fmt; } @Override public Event intercept(Event event) { String content = new String(event.getBody()); Matcher matcher = this.builder.getTimePattern().matcher(content); if (matcher.find()) { try { event.getHeaders().put("timestamp", getFormat().parse(matcher.group(0)).getTime() + ""); } catch (ParseException e) { logger.debug("abandon: " + content); return null; } return event; } else { logger.debug("abandon: " + content); return null; } } @Override public List<Event> intercept(List<Event> events) { List<Event> results = new ArrayList<Event>(); for (Event event : events) { Event e = intercept(event); if (null != e) { results.add(event); } } return results; } @Override public void close() { format.remove(); } @Override public void initialize() { format = new ThreadLocal<SimpleDateFormat>(); } public static class Builder implements Interceptor.Builder { ... public void configure(Context context) { String timeFormat = context.getString("TimeFormat", this.TimeFormat); logger.info("read TimeFormat: " + this.TimeFormat); this.setTimeFormat(timeFormat); } @Override public Interceptor build() { return new TimeExtractInterceptor(this); } } }
需要注意以下几个问题
Interceptor的运行环境可能是多线程的,因此两个重载的intercept函数应该是线程安全的,上述代码在对提取出的时间字符串进行解析的时候使用了SimpleDateFormat类,由于其非线程安全,因此使用ThreadLocal将其绑定到线程中;
Interceptor在调用intercept(Event)函数时,返回空为过滤输入Event,调用intercept(List)时,未添加到返回列表中的Event被过滤。
打包 Interceptor打包方式与常见jar包的生成不同,需要遵循Flume插件的部署规则,尽管可以使用jar-with-dependences的方式将Interceptor和依赖打包成一个文件发布到Flume的classpath中,随着插件集成越来越多,Flume的插件会越来越难以管理。
遵循Flume插件的文件结构规范,使用maven-assembly-plugin自定义打包规范,maven的pom.xml文件中增加以下配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptors> <descriptor>assembly.xml</descriptor> </descriptors> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
assembly.xml配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 <assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd"> <formats> <format>tar.gz</format> </formats> <dependencySets> <dependencySet> <useProjectArtifact>false</useProjectArtifact> <outputDirectory>/libext</outputDirectory> </dependencySet> </dependencySets> <fileSets> <fileSet> <directory>src/main/java/resource/</directory> <outputDirectory>/native</outputDirectory> <includes> <include>*.so</include> </includes> </fileSet> </fileSets> <files> <file> <source>target/${artifactId}-${version}.jar</source> <outputDirectory>/lib</outputDirectory> </file> </files> </assembly>
将assembley.xml放置到指定位置
将target目录下生成的压缩包解压到flume目录的plugins中即可配置使用Interceptor了。
1 2 3 4 a1.sources = r1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = jeff.flume.interceptor.timeExtractInterceptor.TimeExtractInterceptor$Builder a1.sources.r1.interceptors.i1.TimeFormat = "yyyy/MM/dd HH:mm:ss"
源码