重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

FlumeNG学习笔记(八)Interceptors(拦截器)测试-创新互联

版权声明:本文为博主原创文章,未经博主允许不得转载。

成都创新互联公司坚持“要么做到,要么别承诺”的工作理念,服务领域包括:网站制作、成都网站设计、企业官网、英文网站、手机端网站、网站推广等服务,满足客户于互联网时代的灵璧网站设计、移动媒体设计的需求,帮助企业找到有效的互联网解决方案。努力成为您成熟可靠的网络建设合作伙伴!

目录(?)[+]

拦截器主要是对事件的header信息信息操作,要么直接忽略他,要么修改他的数据

一、Event Serializers

file_roll sink 和hdfs sink 都支持EventSerializer接口

1.1、Body Text Serializer

Body TextSerializer,别名:text。这个拦截器将把事件的body部分写入到输出流中而不需要任何转换或者修改。事件的header将直接被忽略。

下面是官网配置:

Property Name

Default

Description

appendNewline

true

Whether a newline will be appended to each event at write time. The default of true assumes that events do not contain newlines, for legacy reasons.

下面是官网例子:appendNewline是选择是否加入到新行去。默认是true,而false 就是换行,一般我们都选择换行。

a1.sinks=k1

a1.sinks.k1.type=file_roll

a1.sinks.k1.channel=c1

a1.sinks.k1.sink.directory=/var/log/flume

a1.sinks.k1.sink.serializer=text

a1.sinks.k1.sink.serializer.appendNewline=false

下面是实际例子

因为是考虑Body TextSerializer的特性,他会忽略header的信息,因此我们这边要采用http source来接收定义的header 与body 的内容

[html] view plain

  1. #配置文件:body_case15.conf

  2. # Name the components on this agent

  3. a1.sources = r1

  4. a1.sinks = k1

  5. a1.channels = c1

  6. # Describe/configure the source

  7. a1.sources.r1.type = http

  8. a1.sources.r1.port = 50000

  9. a1.sources.r1.host = 192.168.233.128

  10. a1.sources.r1.channels = c1

  11. # Describe the sink

  12. a1.sinks.k1.type = file_roll

  13. a1.sinks.k1.channel = c1

  14. a1.sinks.k1.sink.directory = /tmp/logs

  15. a1.sinks.k1.sink.serializer = text

  16. a1.sinks.k1.sink.serializer.appendNewline =false

  17. # Use a channel which buffers events inmemory

  18. a1.channels.c1.type = memory

  19. a1.channels.c1.capacity = 1000

  20. a1.channels.c1.transactionCapacity = 100

#敲命令

flume-ng agent -c conf -fconf/body_case15.conf -n a1 -Dflume.root.logger=INFO,console

启动成功后

打开另一个终端输入,往侦听端口送数据

curl -X POST -d '[{"headers":{"looklook1" : "looklook1 isheader","looklook2": "looklook2 isheader"},"body" : "hellolooklook5"}]'http://192.168.233.128:50000

#在启动源发送的代理终端查看console输出

Flume NG 学习笔记(八)Interceptors(拦截器)测试

数据已经输出,但只输出了hello looklook5,即BODY这块。

1.2、Avro Event Serializer

Avro Event Serializer别名:avro_event。这个拦截器将把事件序列化到一个Avro容器文件中。使用的模式和RPC Avro机制使用到的处理flume事件的机制一样。这个序列化器继承自AbstractAvroEventSerializer类。

官网例子

Property Name

Default

Description

syncIntervalBytes

2048000

Avro sync interval, in approximate bytes.

compressionCodec

null

Avro compression codec. For supported codecs, see Avro’s CodecFactory docs.

下面是官网例子

a1.sinks.k1.type=hdfs

a1.sinks.k1.channel=c1

a1.sinks.k1.hdfs.path=/flume/events/%y-%m-%d/%H%M/%S

a1.sinks.k1.serializer=avro_event

a1.sinks.k1.serializer.compressionCodec=snappy

例子这边就不演示了,因为和BodyText Serializer 差距不大。

二、Timestamp Interceptor

官网说Flume 可以在事件传输过程中对它进行修改与删除,而这个都是通过Interceptor进行实现的,实际都是往事件的header里插数据。而Timestamp Interceptor拦截器就是可以往event的header中插入关键词为timestamp的时间戳。

下面是官网配置

Property Name

Default

Description

type

The component type name, has to be timestamp or the FQCN

preserveExisting

false

If the timestamp already exists, should it be preserved - true or false

以及官网例子

a1.sources=r1

a1.channels=c1

a1.sources.r1.channels= c1

a1.sources.r1.type=seq

a1.sources.r1.interceptors=i1

a1.sources.r1.interceptors.i1.type=timestamp

下面是测试例子

[html] view plain

  1. #配置文件:timestamp_case16.conf

  2. # Name the components on this agent

  3. a1.sources = r1

  4. a1.sinks = k1

  5. a1.channels = c1

  6. # Describe/configure the source

  7. a1.sources.r1.type = syslogtcp

  8. a1.sources.r1.port = 50000

  9. a1.sources.r1.host = 192.168.233.128

  10. a1.sources.r1.channels = c1

  11. a1.sources.r1.interceptors = i1

  12. a1.sources.r1.interceptors.i1.preserveExisting= false

  13. a1.sources.r1.interceptors.i1.type = timestamp

  14. # Describe the sink

  15. a1.sinks.k1.type = hdfs

  16. a1.sinks.k1.channel = c1

  17. a1.sinks.k1.hdfs.path =hdfs://carl:9000/flume/%Y-%m-%d/%H%M

  18. a1.sinks.k1.hdfs.filePrefix = looklook5.

  19. a1.sinks.k1.hdfs.fileType=DataStream

  20. # Use a channel which buffers events inmemory

  21. a1.channels.c1.type = memory

  22. a1.channels.c1.capacity = 1000

  23. a1.channels.c1.transactionCapacity = 100

这里拿header作为文件夹目录名称。

#敲命令

flume-ng agent -c conf -f conf/timestamp_case16.conf-n a1 -Dflume.root.logger=INFO,console

启动成功后

打开另一个终端输入,往侦听端口送数据

echo "TimestampInterceptor" | nc 192.168.233.128 50000

#在启动源发送的代理终端查看console输出

查看hdfs生成的文件,可以看到timestamp已经生成在header里面,可以根据自定义的格式生成文件夹,数据也都传输过来了。

Flume NG 学习笔记(八)Interceptors(拦截器)测试

三、Host Interceptor

该拦截器可以往event的header中插入关键词默认为host主机名或者ip地址(注意是agent运行的机器的主机名或者ip地址)

下面是官网配置

Property Name

Default

Description

type

The component type name, has to be host

preserveExisting

false

If the host header already exists, should it be preserved - true or false

useIP

true

Use the IP Address if true, else use hostname.

hostHeader

host

The header key to be used.

以及官网例子

a1.sources=r1

a1.channels=c1

a1.sources.r1.interceptors=i1

a1.sources.r1.interceptors.i1.type=host

a1.sources.r1.interceptors.i1.hostHeader=hostname

下面是测试例子

[html] view plain

  1. #配置文件:time_host_case17.conf

  2. # Name the components on this agent

  3. a1.sources = r1

  4. a1.sinks = k1

  5. a1.channels = c1

  6. # Describe/configure the source

  7. a1.sources.r1.type = syslogtcp

  8. a1.sources.r1.port = 50000

  9. a1.sources.r1.host = 192.168.233.128

  10. a1.sources.r1.channels = c1

  11. a1.sources.r1.interceptors = i1 i2

  12. a1.sources.r1.interceptors.i1.preserveExisting= false

  13. a1.sources.r1.interceptors.i1.type =timestamp

  14. a1.sources.r1.interceptors.i2.type = host

  15. a1.sources.r1.interceptors.i2.hostHeader =hostname

  16. a1.sources.r1.interceptors.i2.useIP = false

  17. # Describe the sink

  18. a1.sinks.k1.type = hdfs

  19. a1.sinks.k1.channel = c1

  20. a1.sinks.k1.hdfs.path =hdfs://carl:9000/flume/%Y-%m-%d/%H%M

  21. a1.sinks.k1.hdfs.filePrefix = %{hostname}

  22. a1.sinks.k1.hdfs.fileType=DataStream

  23. # Use a channel which buffers events inmemory

  24. a1.channels.c1.type = memory

  25. a1.channels.c1.capacity = 1000

  26. a1.channels.c1.transactionCapacity = 100

增加一个拦截器,类型是host,h将hostname作为文件前缀。

#敲命令

flume-ng agent -c conf -f conf/time_host_case17.conf-n a1 -Dflume.root.logger=INFO,console

启动成功后

打开另一个终端输入,往侦听端口送数据

echo "Time&hostInterceptor1" | nc 192.168.233.128 50000

echo "Time&hostInterceptor2" | nc 192.168.233.128 50000

#在启动源发送的代理终端查看console输出

查看hdfs生成的文件,可以看到host已经生成在header里面,可以根据自定义的格式生成文件夹,数据也都传输过来了。

Flume NG 学习笔记(八)Interceptors(拦截器)测试

四、Static Interceptor

Static Interceptor拦截器允许用户增加一个static的header并为所有的事件赋值。范围是所有事件。

官网配置

Property Name

Default

Description

type

The component type name, has to be static

preserveExisting

true

If configured header already exists, should it be preserved - true or false

key

key

Name of header that should be created

value

value

Static value that should be created

其中参数key与value等于类似json格式里的"headers":{" key":" value"}

下面是官网例子

a1.sources=r1

a1.channels=c1

a1.sources.r1.channels= c1

a1.sources.r1.type=seq

a1.sources.r1.interceptors=i1

a1.sources.r1.interceptors.i1.type=static

a1.sources.r1.interceptors.i1.key=datacenter

a1.sources.r1.interceptors.i1.value=NEW_YORK

以及实际的列子

[html] view plain

  1. #配置文件:static_case18.conf

  2. # Name the components on this agent

  3. a1.sources = r1

  4. a1.sinks = k1

  5. a1.channels = c1

  6. # Describe/configure the source

  7. a1.sources.r1.type = syslogtcp

  8. a1.sources.r1.port = 50000

  9. a1.sources.r1.host = 192.168.233.128

  10. a1.sources.r1.channels = c1

  11. a1.sources.r1.interceptors = i1

  12. a1.sources.r1.interceptors.i1.type = static

  13. a1.sources.r1.interceptors.i1.key = looklook5

  14. a1.sources.r1.interceptors.i1.value =looklook10

  15. # Describe the sink

  16. a1.sinks.k1.type = logger

  17. # Use a channel which buffers events inmemory

  18. a1.channels.c1.type = memory

  19. a1.channels.c1.capacity = 1000

  20. a1.channels.c1.transactionCapacity = 100

  21. # Bind the source and sink to the channel

  22. a1.sources.r1.channels = c1

  23. a1.sinks.k1.channel = c1

#敲命令

flume-ng agent -c conf -f conf/static_case18.conf-n a1 -Dflume.root.logger=INFO,console

启动成功后

打开另一个终端输入,往侦听端口送数据

echo "statInterceptor1" | nc 192.168.233.128 50000

#在启动源发送的代理终端查看console输出

可以看到输出的header信息里自定义部分正确输出,body部分也输出正确。

Flume NG 学习笔记(八)Interceptors(拦截器)测试

五、Regex FilteringInterceptor

Regex Filtering Interceptor拦截器用于过滤事件,筛选出与配置的正则表达式相匹配的事件。可以用于包含事件和排除事件。常用于数据清洗,通过正则表达式把数据过滤出来。

官网配置

Property Name

Default

Description

type

The component type name has to be regex_filter

regex

”.*”

Regular expression for matching against events

excludeEvents

false

If true, regex determines events to exclude, otherwise regex determines events to include.

excludeEvents 为true的时候为排除所有匹配正则表达式的数据。

下面是测试例子

[html] view plain

  1. #配置文件:regex_filter_case19.conf

  2. # Name the components on this agent

  3. a1.sources = r1

  4. a1.sinks = k1

  5. a1.channels = c1

  6. # Describe/configure the source

  7. a1.sources.r1.type = syslogtcp

  8. a1.sources.r1.port = 50000

  9. a1.sources.r1.host = 192.168.233.128

  10. a1.sources.r1.channels = c1

  11. a1.sources.r1.interceptors = i1

  12. a1.sources.r1.interceptors.i1.type =regex_filter

  13. a1.sources.r1.interceptors.i1.regex =^[0-9]*$

  14. a1.sources.r1.interceptors.i1.excludeEvents =true

  15. # Describe the sink

  16. a1.sinks.k1.type = logger

  17. # Use a channel which buffers events inmemory

  18. a1.channels.c1.type = memory

  19. a1.channels.c1.capacity = 1000

  20. a1.channels.c1.transactionCapacity = 100

  21. # Bind the source and sink to the channel

  22. a1.sources.r1.channels = c1

  23. a1.sinks.k1.channel = c1

我们对开头字母是数字的数据,全部过滤。

#敲命令

flume-ng agent -c conf -f conf/regex_filter_case19.conf-n a1 -Dflume.root.logger=INFO,console

启动成功后

打开另一个终端输入,往侦听端口送数据

echo "a" | nc192.168.233.128 50000

echo "1222" |nc 192.168.233.128 50000

echo "a222" |nc 192.168.233.128 50000

#在启动源发送的代理终端查看console输出

Flume NG 学习笔记(八)Interceptors(拦截器)测试

可以看出1222 被认为是无效的数据没有发出来。

Regex Filtering Interceptor测试成功。

另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


分享题目:FlumeNG学习笔记(八)Interceptors(拦截器)测试-创新互联
文章地址:http://cqcxhl.cn/article/cspesj.html

其他资讯

在线咨询
服务热线
服务热线:028-86922220
TOP