0%

Flume Interceptor开发过程

Interceptor

数据在Flume节点间流动时,是用Event模型封装的,Interceptor可以在Event流经节点的Source时,对Event进行操作,包括对Event进行过滤实现数据清洗,添加Event标记以实现特殊功能如数据分流等。

Interceptor配置在agent的Source中,按配置的顺序生效,Interceptor本身也可读取Flume的上下文配置信息。

随包发布的Interceptor

Flume随包发布了若干个Interceptor,覆盖了常用的大多数场景。以下是一个简单的时间戳Iterceptor配置

1
2
3
a1.sources = r1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

流经TimestampInterceptor的Event会被打上timestamp头信息,在某些Sink中,会使用timestamp进行数据落地,如HDFSSink中,对数据按时间写入时,可能使用Event头部的timestamp,此时需要所有的数据流分路上至少有一个节点在Event头部写入了时间戳,以下是一个典型的HDFS Sink的配置

1
2
3
4
5
6
7
8
9
10
11
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# 默认为false,及不使用event头部的时间戳,而使用节点上写入时的时间戳,当数据有比较大的延迟时,可能使得较多数据落地到不一致的文件中。
a1.sinks.k1.hdfs.useLocalTimeStamp = false

Interceptor编写

在某些情况下,我们可能需要对Event进行一些自定义的操作,比较常见的是使用Flume进行数据流清洗。甚至有时候,我们可能对Event中封装的数据进行改写,比如,在对nginx日志进行采集时,需要将请求中的session转换成用户编号进行数据定锚,而session又具有一定的实效性,因此我们在数据流经的第一层节点上增加了一个自定义Interceptor,用以获取用户编号后,将其织入Event数据中(业务代码,暂不提供详细描述)。

下面的例子详细描述了实现一个Interceptor的过程

示例Interceptor功能简介

这里使用的例子依然与时间戳有关,在前面使用TimestampInterceptor时,我们举过一个例子,但是数据流入集群时的时间戳并不是数据真正产生的时间戳,最好是能从数据本身提取出时间信息,这种模式在常规的日志框架内都能得到有效支持,不过对于不同的框架、配置,数据中的时间模式可能不尽相同,需要通过节点中的配置文件来指明。

Interceptor接口要求

Inteceptor需要实现org.apache.flume.interceptor.Interceptor接口,实际配置到Flume配置文件中的是Interceptor的构造类,因此,还需要实现org.apache.flume.interceptor.Interceptor.Builder接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package org.apache.flume.interceptor;

public interface Interceptor {
public void initialize();
public Event intercept(Event event);
public List<Event> intercept(List<Event> events);
public void close();

public interface Builder extends Configurable {
public Interceptor build();
}
}

package org.apache.flume.conf;

public interface Configurable {
public void configure(Context context);
}

maven依赖为

1
2
3
4
5
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>{flume_version}</version>
</dependency>

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package jeff.flume.interceptor.timeExtractInterceptor;

...

public class TimeExtractInterceptor implements Interceptor {
private Builder builder;
private ThreadLocal<SimpleDateFormat> format;

private SimpleDateFormat getFormat() {
SimpleDateFormat fmt = format.get();
if (null == fmt) {
fmt = new SimpleDateFormat(this.builder.getTimeFormat());
format.set(fmt);
}
return fmt;
}

@Override
public Event intercept(Event event) {
String content = new String(event.getBody());
Matcher matcher = this.builder.getTimePattern().matcher(content);
if (matcher.find()) {
try {
event.getHeaders().put("timestamp", getFormat().parse(matcher.group(0)).getTime() + "");
} catch (ParseException e) {
logger.debug("abandon: " + content);
return null;
}
return event;
} else {
logger.debug("abandon: " + content);
return null;
}
}

@Override
public List<Event> intercept(List<Event> events) {
List<Event> results = new ArrayList<Event>();
for (Event event : events) {
Event e = intercept(event);
if (null != e) {
results.add(event);
}
}
return results;
}

@Override
public void close() {
format.remove();
}

@Override
public void initialize() {
format = new ThreadLocal<SimpleDateFormat>();
}

public static class Builder implements Interceptor.Builder {

...

public void configure(Context context) {
String timeFormat = context.getString("TimeFormat", this.TimeFormat);
logger.info("read TimeFormat: " + this.TimeFormat);
this.setTimeFormat(timeFormat);
}

@Override
public Interceptor build() {
return new TimeExtractInterceptor(this);
}
}
}

需要注意以下几个问题

  1. Interceptor的运行环境可能是多线程的,因此两个重载的intercept函数应该是线程安全的,上述代码在对提取出的时间字符串进行解析的时候使用了SimpleDateFormat类,由于其非线程安全,因此使用ThreadLocal将其绑定到线程中;
  2. Interceptor在调用intercept(Event)函数时,返回空为过滤输入Event,调用intercept(List)时,未添加到返回列表中的Event被过滤。

打包

Interceptor打包方式与常见jar包的生成不同,需要遵循Flume插件的部署规则,尽管可以使用jar-with-dependences的方式将Interceptor和依赖打包成一个文件发布到Flume的classpath中,随着插件集成越来越多,Flume的插件会越来越难以管理。

遵循Flume插件的文件结构规范,使用maven-assembly-plugin自定义打包规范,maven的pom.xml文件中增加以下配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>assembly.xml</descriptor>
</descriptors>
</configuration>

<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

</plugins>
</build>

assembly.xml配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">

<formats>
<format>tar.gz</format>
</formats>

<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>/libext</outputDirectory>
</dependencySet>
</dependencySets>

<fileSets>
<fileSet>
<directory>src/main/java/resource/</directory>
<outputDirectory>/native</outputDirectory>
<includes>
<include>*.so</include>
</includes>
</fileSet>
</fileSets>

<files>
<file>
<source>target/${artifactId}-${version}.jar</source>
<outputDirectory>/lib</outputDirectory>
</file>
</files>

</assembly>

将assembley.xml放置到指定位置

1
$ mvn clean package

将target目录下生成的压缩包解压到flume目录的plugins中即可配置使用Interceptor了。

1
2
3
4
a1.sources = r1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = jeff.flume.interceptor.timeExtractInterceptor.TimeExtractInterceptor$Builder
a1.sources.r1.interceptors.i1.TimeFormat = "yyyy/MM/dd HH:mm:ss"
源码