亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

flume日志采集

系統 2312 0

1.??Log4j Appender

1.1.? 使用說明

1.1.2.??Client端Log4j配置文件

(黃色文字為需要配置的內容)

log4j.rootLogger= INFO,A1,R

?

?

# ConsoleAppender out

log4j.appender.A1= org . apache .log4j.ConsoleAppender

log4j.appender.A1.layout= org . apache .log4j.PatternLayout

log4j.appender.A1.layout.ConversionPattern= %d{ yyyy /MM/ dd HH : mm:ss} %-5p %-10C {1} ? %m%n

# File out

// 日志 Appender 修改為 flume 提供的 Log4jAppender

log4j.appender.R= org . apache . flume .clients.log4jappender.Log4jAppender

log4j.appender.R.File= ${ catalina .home}/logs/ ultraIDCPServer .log

// 日志需要發送到的端口號,該端口要有 ARVO 類型的 source 在監聽

log4j.appender.R.Port = 44444

// 日志需要發送到的主機 ip ,該主機運行著 ARVO 類型的 source

log4j.appender.R.Hostname = localhost

log4j.appender.R.MaxFileSize= 102400KB

# log4j.appender.R.MaxBackupIndex=5

log4j.appender.R.layout= org . apache .log4j.PatternLayout

log4j.appender.R.layout.ConversionPattern= %d{ yyyy /MM/ dd HH\ : mm \: ss } %-5p %-10C {1} ? %m%n

log4j.appender.R.encoding= UTF-8

?

log4j.logger.com.ultrapower.ultracollector.webservice.MessageIntercommunionInterfaceImpl= INFO, webservice

log4j.appender.webservice= org . apache .log4j.FileAppender

log4j.appender.webservice.File= ${ catalina .home}/logs/logsMsgIntercommunionInterface.log

log4j.appender.webservice.layout= org . apache .log4j.PatternLayout

log4j.appender.webservice.layout.ConversionPattern= %d{ yyyy /MM/ dd HH\ : mm \: ss } %-5p [%t] %l %X - %m%n

log4j.appender.webservice.encoding= UTF-8

注: Log4jAppender 繼承自 AppenderSkeleton ,沒有日志文件達到特定大小,轉換到新的文件的功能

1.1.3.??flume agent配置

agent1.sources = source1

agent1.sinks = sink1

agent1.channels = channel1

# Describe/configure source1

agent1.sources.source1.type = avro

agent1.sources.source1.bind = 192.168.0.141

agent1.sources.source1.port = 44444

?

# Describe sink1

agent1.sinks.sink1.type = FILE_ROLL

agent1.sinks.sink1.sink.directory = /home/yubojie/flume/apache-flume-1.2.0/flume-out

?

# Use a channel which buffers events in memory

agent1.channels.channel1.type = memory

agent1.channels.channel1.capacity = 1000

agent1.channels.channel1.transactionCapactiy = 100

?

# Bind the source and sink to the channel

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1

注:生成的文件的規則為每隔固定時間間隔生成一個新的文件,文件里面保存該時間段 agent 接收到的信息

1.2.? 分析

1.???????使用簡便,工作量小。

2.???????用戶應用程序使用log4j作為日志記錄jar包,而且項目中使用的jar包要在log4j-1.2.15版本以上,

3.???????應用系統必須將flume所需jar包引入到項目中。如下所示為所有必須jar包:可能會存在jar沖突,影響應用運行

?

4.???????能夠提供可靠的數據傳輸,使用flume log4jAppender采集日志可以不在客戶機上啟動進程,而只通過修改logapppender直接把日志信息發送到采集機(參見圖一),此種情況可以保證采集機接受到數據之后的數據可靠性,但是客戶機與采集機連接失敗時候數據會丟失。改進方案是在客戶機上啟動一個agent,這樣可以保證客戶機和采集機不能連通時,當能連通是日志也被采集上來,不會發送數據的丟失(參見圖二),為了可靠性,需在客戶機上啟動進程

???????????????

1.3.? 日志代碼

Log.info(“this message has DEBUG in it”);

1.4.? 采集到的數據樣例

this message has DEBUG in it

this message has DEBUG in it

?

2.??Exec source(放棄)

The problem with ExecSource and other asynchronous sources is that thesource can not guarantee that if there is a failure to put the event into theChannel the client knows about it. In such cases, the data will be lost. As afor instance, one of the most commonly requested features is the tail -F [file] -like use casewhere an application writes to a log file on disk and Flume tails the file,sending each line as an event. While this is possible, there’s an obviousproblem; what happens if the channel fills up and Flume can’t send an event?Flume has no way of indicating to the application writing the log file that itneeds to retain the log or that the event hasn’t been sent, for some reason. Ifthis doesn’t make sense, you need only know this: Your application can neverguarantee data has been received when using a unidirectional asynchronousinterface such as ExecSource! As an extension of this warning - and to becompletely clear - there is absolutely zero guarantee of event delivery whenusing this source. You have been warned.

注:即使是 agent 內部的可靠性都不能保證

2.1.? 使用說明

2.1.1.??flume agent配置

?

# The configuration file needs to define the sources,

# the channels and the sinks.

# Sources, channels and sinks are defined per agent,

# in this case called 'agent'

# example.conf: A single-node Flume configuration

?

# Name the components on this agent

?

agent1.sources = source1

agent1.sinks = sink1

agent1.channels = channel1

# Describe/configure source1

#agent1.sources.source1.type = avro

agent1.sources.source1.type = exec

agent1.sources.source1.command = tail -f /home/yubojie/logs/ultraIDCPServer.log

#agent1.sources.source1.bind = 192.168.0.146

#agent1.sources.source1.port = 44444

?

agent1.sources.source1.interceptors = a

agent1.sources.source1.interceptors.a.type = org.apache.flume.interceptor.HostInterceptor$Builder

agent1.sources.source1.interceptors.a.preserveExisting = false

agent1.sources.source1.interceptors.a.hostHeader = hostname

?

?

# Describe sink1

#agent1.sinks.sink1.type = FILE_ROLL

#agent1.sinks.sink1.sink.directory = /home/yubojie/flume/apache-flume-1.2.0/flume-out

agent1.sinks.sink1.type = hdfs

agent1.sinks.sink1.hdfs.path = hdfs://localhost:9000/user/

agent1.sinks.sink1.hdfs.fileType = DataStream

?

# Use a channel which buffers events in memory

agent1.channels.channel1.type = memory

agent1.channels.channel1.capacity = 1000

agent1.channels.channel1.transactionCapactiy = 100

?

# Bind the source and sink to the channel

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1

?

2.2.? 分析

1.?????????tail方式采集日志需要宿主主機能夠執行tail命令,應該是只有linux系統可以執行,不支持window系統日志采集

2.?????????EXEC采用異步方式采集,會發生日志丟失,即使在節點內的數據也不能保證數據的完整

3.?????????tail方式采集需要宿主操作系統支持tail命令,即原始的windows操作系統不支持tail命令采集

2.3.? 采集到的數據樣例

2012/10/26 02:36:34 INFO? LogTest???? this message has DEBUG 中文 in it

2012/10/26 02:40:12 INFO? LogTest???? this message has DEBUG 中文 in it

2.4.? 日志代碼

Log.info(“this message has DEBUG 中文 in it”);

3.?? Syslog

Passing messages using syslogprotocol doesn't work well for longer messages. ?The syslog appender forLog4j is hardcoded to linewrap around 1024 characters in order to comply withthe RFC. ?I got a sample program logging to syslog, picking it up with asyslogUdp source, with a JSON layout (to avoid new-lines in stack traces) onlyto find that anything but the smallest stack trace line-wrapped anyway. ?Ican't see a way to reliably reconstruct the stack trace once it is wrapped andsent through the flume chain.( 注:內容不確定是否 1.2 版本

?? Syslog TCP需要指定eventsize,默認為2500

?? Syslog UDP為不可靠傳輸,數據傳輸過程中可能出現丟失數據的情況。

3.1.? 使用說明

3.1.1.??Client端示例代碼

import ? java.io.IOException;

import java.io.OutputStream ;

import ? java.net.Socket;

import ? java.net.UnknownHostException;

?

?

public class ? SyslogTcp {

? public static void ? main(String args[]){

??? ? ?Socket client = ? null ;

??? ? ? OutputStream ? out = null ;

??? ? try ? {

?????? ? client = ? new ? Socket( "127.0.0.1" , 5140);

?????? ? out= client.getOutputStream();?

?????? ? ?String event = ? "<4>hello\n" ;?

?????? ? ?out.write(event.getBytes());?

?????? ? ?out.flush();?

?????? ? ?System. out .println( " 發送成功 ? " );

??? ? } ? catch ? (UnknownHostException e) {

?????? ? // TODO ? Auto-generated catch block

?????? ? e.printStackTrace();

??? ? } ? catch ? (IOException e) {

?????? ? // TODO ? Auto-generated catch block

?????? ? e.printStackTrace();

??? ? } ? finally {

?????? ? try ? {

?????????? ? out.close();

?????? ? } ? catch ? (IOException e) {

?????????? ? // TODO ? Auto-generated catch block

?????????? ? e.printStackTrace();

?????? ? }?

?????? ? ? try ? {

?????????? ? client.close();

?????? ? } ? catch ? (IOException e) {

?????????? ? // TODO ? Auto-generated catch block

?????????? ? e.printStackTrace();

?????? ? }

??? ? }

??? ? ??

? }

}

?

?

3.1.2.??日志接收的flume agent配置

agent1.sources = source1

agent1.sinks = sink1

agent1.channels = channel1

?

# Describe/configure source1

agent1.sources.source1.type = ? syslogtcp

agent1.sources.source1.bind = 127.0.0.1

agent1.sources.source1.port = 5140

?

# Describe sink1

#agent1.sinks.sink1.type = ? avro

#agent1.sinks.sink1.channels = channel1

#agent1.sinks.sink1. hostname ? = 192.168.0.144

#agent1.sinks.sink1.port = 44444

?

agent1.sinks.sink1.type = FILE_ROLL

agent1.sinks.sink1.sink.directory = E:\\file-out

?

?

# Use a channel which buffers events in memory

agent1.channels.channel1.type = memory

agent1.channels.channel1.capacity = 1000

agent1.channels.channel1.transactionCapactiy = 100

?

# Bind the source and sink to the channel

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1

3.2.????? 分析

需要編寫Client采集代碼,增量采集日志信息通過socket發送到flume agent;對于長數據處理不是很理想。可靠性可以參考log4j appender的方式來保證。

?

?

4.??日志過濾 Interceptor FLUME-1358)

Flume支持依據正則表達式過濾event,但是在1.2.0的源代碼中沒有發現具體實現的代碼,根據FLUME-1358的說明信息,可以將RegexFilteringInterceptor類加入到代碼中使用。

需要的操作為:

添加類RegexFilteringInterceptor

修改InterceptorType,添加type與類的映射關系:

REGEX_FILTER(org.apache.flume.interceptor.RegexFilteringInterceptor.Builder.class)

4.1.? Regex FilteringInterceptor說明

This interceptor filters events selectively by interpreting the eventbody as text and matching the text against a configured regular expression. Thesupplied regular expression can be used to include events or exclude events.

Property Name

Default

Description

type

The component type name has to be ? REGEX_FILTER

regex

”.*”

Regular expression for matching against events

excludeRegex

false

If true, regex determines events to exclude, otherwise regex determines events to include.

4.2.? 使用說明(測試配置)

4.2.1.??日志接收的Flume agent配置

agent1.sources = source1

agent1.sinks = sink1

agent1.channels = channel1

?

# Describe/configure source1

agent1.sources.source1.type = ? avro

agent1.sources.source1.bind = ? localhost

agent1.sources.source1.port = 5140

?

?

agent1.sources.source1. interceptors ? = inter1

agent1.sources.source1. interceptors .inter1.type = REGEX_FILTER

agent1.sources.source1. interceptors .inter1. regex ? = .*DEBUG.*

agent1.sources.source1. interceptors .inter1.excludeRegex = false

?

?

?

?

# Describe sink1

#agent1.sinks.sink1.type = ? avro

#agent1.sinks.sink1.channels = channel1

#agent1.sinks.sink1. hostname ? = 192.168.0.144

#agent1.sinks.sink1.port = 44444

?

agent1.sinks.sink1.type = FILE_ROLL

agent1.sinks.sink1.sink.directory = E:\\file-out

?

?

# Use a channel which buffers events in memory

agent1.channels.channel1.type = memory

agent1.channels.channel1.capacity = 1000

agent1.channels.channel1.transactionCapactiy = 100

?

# Bind the source and sink to the channel

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1

5.??HDFS SINK

5.1.? 使用說明

輸出到hdfs的數據,首先在hdfs上創建文件.tmp,然后文件關閉時,將tmp后綴去掉,存儲方案與file輸出類似,可以設定時間間隔、文件大小、接受事件條數作為滾動生成新文件的依據,默認30s

?

5.2.? 可配置項

Name

Default

Description

channel

?

type

The component type name, needs to be ? hdfs

hdfs.path

HDFS directory path (eg hdfs://namenode/flume/webdata/)

hdfs.filePrefix

FlumeData

Name prefixed to files created by Flume in hdfs directory

hdfs.rollInterval

30

Number of seconds to wait before rolling current file (0 = never roll based on time interval)

hdfs.rollSize

1024

File size to trigger roll, in bytes (0: never roll based on file size)

hdfs.rollCount

10

Number of events written to file before it rolled (0 = never roll based on number of events)

hdfs.batchSize

1

number of events written to file before it flushed to HDFS

hdfs.txnEventMax

100

?

hdfs.codeC

Compression codec. one of following : gzip, bzip2, lzo, snappy

hdfs.fileType

SequenceFile

File format: currently ? SequenceFile , DataStream ? or CompressedStream (1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC

hdfs.maxOpenFiles

5000

?

hdfs.writeFormat

“Text” or “Writable”

hdfs.appendTimeout

1000

?

hdfs.callTimeout

10000

?

hdfs.threadsPoolSize

10

Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)

hdfs.rollTimerPoolSize

1

Number of threads per HDFS sink for scheduling timed file rolling

hdfs.kerberosPrincipal

Kerberos user principal for accessing secure HDFS

hdfs.kerberosKeytab

Kerberos keytab for accessing secure HDFS

hdfs.round

false

Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)

hdfs.roundValue

1

Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit ), less than current time.

hdfs.roundUnit

second

The unit of the round down value - ? second , minute ? or hour .

serializer

TEXT

Other possible options include ? AVRO_EVENT ? or the fully-qualified class name of an implementation of the EventSerializer.Builder ? interface.

serializer.*

?

?

5.3.? Agent配置樣例

?

# The configuration file needs to define the sources,

# the channels and the sinks.

# Sources, channels and sinks are defined per agent,

# in this case called 'agent'

# example.conf: A single-node Flume configuration

?

# Name the components on this agent

?

agent1.sources = source1

agent1.sinks = sink1

agent1.channels = channel1

# Describe/configure source1

#agent1.sources.source1.type = avro

agent1.sources.source1.type = exec

agent1.sources.source1.command = tail -f /home/yubojie/logs/ultraIDCPServer.log

#agent1.sources.source1.bind = 192.168.0.146

#agent1.sources.source1.port = 44444

?

agent1.sources.source1.interceptors = a

agent1.sources.source1.interceptors.a.type = org.apache.flume.interceptor.HostInterceptor$Builder

agent1.sources.source1.interceptors.a.preserveExisting = false

agent1.sources.source1.interceptors.a.hostHeader = hostname

?

?

# Describe sink1

#agent1.sinks.sink1.type = FILE_ROLL

#agent1.sinks.sink1.sink.directory = /home/yubojie/flume/apache-flume-1.2.0/flume-out

agent1.sinks.sink1.type = hdfs

agent1.sinks.sink1.hdfs.path = hdfs://192.168.98.20:9000/user/hadoop/yubojietest

agent1.sinks.sink1.hdfs.fileType = DataStream

?

# Use a channel which buffers events in memory

agent1.channels.channel1.type = memory

agent1.channels.channel1.capacity = 1000

agent1.channels.channel1.transactionCapactiy = 100

?

# Bind the source and sink to the channel

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1

?

6.??多agent采集文件到hdfs

6.1.? 準備工作

1.?????????文件采集類打包成jar放到flume/apache-flume-1.2.0/lib目錄下

2.?????????創建fileSourceRecorder.properties空文件放到flume/apache-flume-1.2.0/conf下(將要修改為如果文件不存在則創建該文件,后續將不用再創建這個文件)

?

6.2.? agent配置文件

6.2.1.??agent1

# example.conf: A single-node Flume configuration?????

?????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????

# Name the components on this agent????????????????

??????????????????????????????????????????????? ????

agent1.sources = source1??????????????????????????????????????????????????????????????????????????????

agent1.sinks = sink1??????????????????????????????????????????????????????????????????????????

agent1.channels = channel1????????????????????????

???????????????????????????????????????????????????

# Describe/configure source1???????????????????????

agent1.sources.source1.type = com.ultrapower.ultracollector.flume.source.file.FileSource????????????????????????????????????????????????

?

agent1.sources.source1.path = /home/yubojie/logs/ultraIDCPServer.log

#gbk,utf-8?

agent1.sources.source1.encoding = utf-8

agent1.sources.source1.onceMaxReadByte = 999

agent1.sources.source1.cacheQueueSize = 10

agent1.sources.source1.noChangeSleepTime = 1000

agent1.sources.source1.batchCommitSize = 5

agent1.sources.source1.batchWaitTime = 500????????

???????

#agent1.sources.source1.type = avro

#agent1.sources.source1.bind = localhost

#agent1.sources.source1.port = 44444

????????????????????????????????????? ????????????????????????????????????????????????

# Describe sink1???????????????????????????????????????????????????????????????????????????????????????

#agent1.sinks.sink1.type = logger??????????

#agent1.sinks.sink1.type = FILE_ROLL

#agent1.sinks.sink1.sink.directory = E:/file-out

#agent1.sinks.sink1.sink.fileName = a.log

agent1.sinks.sink1.type = hdfs

#agent1.sinks.sink1.hdfs.path = hdfs://192.168.98.20:9000/user/hadoop/yubojietest

agent1.sinks.sink1.hdfs.path = hdfs://192.168.0.153:9000/user/file

agent1.sinks.sink1.hdfs.callTimeout = 20000

agent1.sinks.sink1.hdfs.fileType = DataStream

#agent1.sinks.sink1.sink.rollInterval = 30????????

???????????????

??????????????????????????????????????????

# Use a channel which buffers events in memory????????

agent1.channels.channel1.type = memory?????????????

agent1.channels.channel1.capacity = 1000???????????

agent1.channels.channel1.transactionCapactiy = 100?

??????????????????????????????????????????

??????????????????????????????????????????????????

# Bind the source and sink to the channel??????????????????????????????????????????????????????????????

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1???????

????

########################## test method ########################################

?

#########start flume agent #########?

#agent -n agent1 -f .\conf\flume-conf.properties.template.file.signle

?

######### client send message #########

?

# $ bin/flume-ng avro-client -H localhost -p 44444 -F 'F:/1/log.log'

6.2.2.??agent2

# example.conf: A single-node Flume configuration?????

?????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????

# Name the components on this agent?????????? ??????

???????????????????????????????????????????????????

agent2.sources = source1??????????????????????????????????????????????????????????????????????????????

agent2.sinks = sink1??????????????????????????????????????????????????????????????????????????

agent2.channels = channel1????????????????????????

???????????????????????????????????????????????????

# Describe/configure source1???????????????????????

agent2.sources.source1.type = com.ultrapower.ultracollector.flume.source.file.FileSource????????????????????????????????????????????????

?

agent2.sources.source1.path = /home/yubojie/logtest/logs/ultraIDCPServer.log

#gbk,utf-8?

agent2.sources.source1.encoding = utf-8

agent2.sources.source1.onceMaxReadByte = 999

agent2.sources.source1.cacheQueueSize = 10

agent2.sources.source1.noChangeSleepTime = 1000

agent2.sources.source1.batchCommitSize = 5

agent2.sources.source1.batchWaitTime = 500????????

???????

#agent1.sources.source1.type = avro

#agent1.sources.source1.bind = localhost

#agent1.sources.source1.port = 44444

?????????????????????????????????????????????????????????????????????????????????????

# Describe sink1??????????????????????????????????????????? ????????????????????????????????????????????

#agent1.sinks.sink1.type = logger??????????

#agent1.sinks.sink1.type = FILE_ROLL

#agent1.sinks.sink1.sink.directory = E:/file-out

#agent1.sinks.sink1.sink.fileName = a.log

agent2.sinks.sink1.type = hdfs

#agent1.sinks.sink1.hdfs.path = hdfs://192.168.98.20:9000/user/hadoop/yubojietest

agent2.sinks.sink1.hdfs.path = hdfs://192.168.0.153:9000/user/file

agent2.sinks.sink1.hdfs.callTimeout = 20000

agent2.sinks.sink1.hdfs.fileType = DataStream

#agent1.sinks.sink1.sink.rollInterval = 30????????

???????????????

??????????????????????????????????????????

# Use a channel which buffers events in memory????????

agent2.channels.channel1.type = memory?????????????

agent2.channels.channel1.capacity = 1000???????????

agent2.channels.channel1.transactionCapactiy = 100?

??????????????????????????????????????????

??????????????????????????????????????????????????

# Bind the source and sink to the channel?????????????????????????????????????????????? ????????????????

agent2.sources.source1.channels = channel1

agent2.sinks.sink1.channel = channel1???????

????

########################## test method ########################################

?

#########start flume agent #########?

#agent -n agent1 -f .\conf\flume-conf.properties.template.file.signle

?

######### client send message #########

?

# $ bin/flume-ng avro-client -H localhost -p 44444 -F 'F:/1/log.log'

6.3.? 啟動命令

flume-ng agent -name agent1 -c conf -f ../conf/flume-conf.properties??

//agent1監控/home/yubojie/logs/ultraIDCPServer.log

flume-ng agent -name agent2 -c conf -f ../conf/flume-conf2.properties

//agent2監控/home/yubojie/logtest/logs/ultraIDCPServer.log

6.4.? 測試結果

1.?????????agent1和agent2各自監控相應文件,互不干涉

2.?????????文件各自輸出到hdfs生成各自的文件

6.??參考資料:

資料

日志采集

https://issues.cloudera.org//browse/FLUME-27

http://archive.cloudera.com/cdh/3/flume-ng-1.2.0-cdh3u5/FlumeUserGuide.html#exec-source

http://www.quora.com/Flume/What-Flume-sources-do-people-use-in-production

http://blog.csdn.net/rzhzhz/article/details/7610252

過濾: https://issues.apache.org/jira/secure/attachment/12537520/FLUME-1358.patch.v4.txt

https://issues.apache.org/jira/browse/FLUME-1358

?

RegexFilteringInterceptor源代碼

?

?

package org.apache.flume.interceptor;

?

import static org.apache.flume.interceptor.RegexFilteringInterceptor.Constants. DEFAULT_EXCLUDE_EVENTS ;

import static ? org.apache.flume.interceptor.RegexFilteringInterceptor.Constants. DEFAULT_REGEX ;

import static ? org.apache.flume.interceptor.RegexFilteringInterceptor.Constants. EXCLUDE_EVENTS ;

import static ? org.apache.flume.interceptor.RegexFilteringInterceptor.Constants. REGEX ;

?

import ? java.util.List;

import ? java.util.regex.Pattern;

?

import ? org.apache.flume.Context;

import ? org.apache.flume.Event;

import ? org.slf4j.Logger;

import org.slf4j.LoggerFactory ;

?

import ? com.google.common.collect.Lists;

?

public class ? RegexFilteringInterceptor implements ? Interceptor {

?

? ? private static final ? Logger logger ? = LoggerFactory

????? ? . getLogger (RegexFilteringInterceptor. class );

?

? ? private final ? Pattern regex ;

? ? private final boolean excludeEvents ;

?

? ? /**

?? ? * Only {@link RegexFilteringInterceptor.Builder} can build me

?? ? */

? ? private ? RegexFilteringInterceptor(Pattern regex, boolean ? excludeEvents) {

??? ? this . regex ? = regex;

??? ? this . excludeEvents ? = excludeEvents;

? ? }

?

? ? @Override

? ? public void ? initialize() {

??? ? // no- op

? ? }

?

?

? ? @Override

? ? /**

?? ? * Returns the event if it passes the regular expression filter and null

?? ? * otherwise.

?? ? */

? ? public ? Event intercept(Event event) {

??? ? // We've already ensured here that at most one of includeRegex and

??? ? // excludeRegex are defined.

?

??? ? if ? (! excludeEvents ) {

????? ? if ? ( regex .matcher( new ? String(event.getBody())).find()) {

??????? ? return ? event;

????? ? }

????? ? else ? {

??????? ? return null ;

????? ? }

??? ? }

??? ? else ? {

????? ? if ? ( regex .matcher( new ? String(event.getBody())).find()) {

??????? ? return null ;

????? ? }

????? ? else ? {

??????? ? return ? event;

????? ? }

??? ? }

? ? }

?

? ? /**

?? ? * Returns the set of events which pass filters, according to

?? ? * {@link #intercept(Event)} .

?? ? * @param events

?? ? * @return

?? ? */

? ? @Override

? ? public ? List<Event> intercept(List<Event> events) {

??? ? List<Event> out = Lists. newArrayList ();

??? ? for ? (Event event : events) {

????? ? Event outEvent = intercept(event);

????? ? if ? (outEvent != null ) { out.add(outEvent); }

??? ? }

??? ? return ? out;

? ? }

? ? @Override

? ? public void ? close() {

??? ? // no- op

? ? }

? ? /**

?? ? * Builder which builds new instance of the StaticInterceptor.

?? ? */

? ? public static class ? Builder implements ? Interceptor.Builder {

?

??? ? private ? Pattern regex ;

??? ? private boolean excludeEvents ;

??? ? @Override

??? ? public void ? configure(Context context) {

????? ? String regexString = context.getString( REGEX , DEFAULT_REGEX );

????? ? regex ? = Pattern. compile (regexString);

????? ? excludeEvents ? = context.getBoolean( EXCLUDE_EVENTS ,

????????? ? DEFAULT_EXCLUDE_EVENTS );

??? ? }

??? ? @Override

??? ? public ? Interceptor build() {

????? ? logger .info(String. format (

????????? ? "Creating RegexFilteringInterceptor: regex=%s,excludeEvents=%s" ,

????????? ? regex , excludeEvents ));

????? ? return new ? RegexFilteringInterceptor( regex , excludeEvents );

??? ? }

? ? }

? ? public static class ? Constants {

??? ? public static final ? String REGEX ? = "regex" ;

??? ? public static final ? String DEFAULT_REGEX ? = ".*" ;

??? ? public static final ? String EXCLUDE_EVENTS ? = "excludeEvents" ;

??? ? public static final boolean DEFAULT_EXCLUDE_EVENTS ? = ? false ;

? ? }

}

InterceptorType 源代碼

黃色為添加內容

package org.apache.flume.interceptor;

?

public enum InterceptorType {

?

? TIMESTAMP(org.apache.flume.interceptor.TimestampInterceptor.Builder.class),

? HOST(org.apache.flume.interceptor.HostInterceptor.Builder.class),

? ? REGEX_FILTER(org.apache.flume.interceptor.RegexFilteringInterceptor.Builder.class) ,

? ;

?

? private final Class<? extends Interceptor.Builder> builderClass;

?

? private InterceptorType(Class<? extends Interceptor.Builder> builderClass) {

??? this.builderClass = builderClass;

? }

?

? public Class<? extends Interceptor.Builder> getBuilderClass() {

??? return builderClass;

? }

?

}

flume日志采集


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 91视频网页版 | 精品国产一区二区三区19 | 国产成+人欧美+综合在线观看 | 亚洲乱强 | 国产精品第7页 | 四虎欧美永久在线精品免费 | 成年女人免费视频播放77777 | 日本高清在线精品一区二区三区 | 亚洲欧美人成综合在线最新 | 欧美亚洲高清日韩成人 | 视频国产在线 | 国产不卡网 | 中文日韩欧美 | 免费看一级欧美毛片 | 神马影院888不卡院 神马影院不卡不卡在线观看 | 久久综合九色综合97婷婷群聊 | 老司机日日摸夜夜摸精品影院 | 国产欧美精品一区二区三区-老狼 | 欧美精品久久久久久久久大尺度 | 欧美日韩生活片 | 亚洲精品国产第一区二区三区 | 毛片在线免费视频 | 亚洲欧美日韩中文字幕在线 | 狠狠干综合 | 久久精品无遮挡一级毛片 | 四虎成人欧美精品在永久在线 | 97成人精品视频在线播放 | 福利视频网站 | 国产精品永久在线 | 久久精品亚瑟全部免费观看 | 精品国产一区二区三区免费 | 五月婷在线 | 女人牲交视频一级毛片 | www4虎| 久久我们这里只有精品国产4 | 香蕉人人超人人超免费看视频 | 超碰人人操 | 欧美日韩操| 一本色道久久爱88av俺来也 | 99精品视频在线观看免费专区 | 极品专区高清在线 |