1.??Log4j Appender
1.1.? 使用說明
(黃色文字為需要配置的內容) log4j.rootLogger= INFO,A1,R ? ? # ConsoleAppender out log4j.appender.A1= org . apache .log4j.ConsoleAppender log4j.appender.A1.layout= org . apache .log4j.PatternLayout log4j.appender.A1.layout.ConversionPattern= %d{ yyyy /MM/ dd HH : mm:ss} %-5p %-10C {1} ? %m%n # File out // 日志 Appender 修改為 flume 提供的 Log4jAppender log4j.appender.R= org . apache . flume .clients.log4jappender.Log4jAppender log4j.appender.R.File= ${ catalina .home}/logs/ ultraIDCPServer .log // 日志需要發送到的端口號,該端口要有 ARVO 類型的 source 在監聽 log4j.appender.R.Port = 44444 // 日志需要發送到的主機 ip ,該主機運行著 ARVO 類型的 source log4j.appender.R.Hostname = localhost log4j.appender.R.MaxFileSize= 102400KB # log4j.appender.R.MaxBackupIndex=5 log4j.appender.R.layout= org . apache .log4j.PatternLayout log4j.appender.R.layout.ConversionPattern= %d{ yyyy /MM/ dd HH\ : mm \: ss } %-5p %-10C {1} ? %m%n log4j.appender.R.encoding= UTF-8 ? log4j.logger.com.ultrapower.ultracollector.webservice.MessageIntercommunionInterfaceImpl= INFO, webservice log4j.appender.webservice= org . apache .log4j.FileAppender log4j.appender.webservice.File= ${ catalina .home}/logs/logsMsgIntercommunionInterface.log log4j.appender.webservice.layout= org . apache .log4j.PatternLayout log4j.appender.webservice.layout.ConversionPattern= %d{ yyyy /MM/ dd HH\ : mm \: ss } %-5p [%t] %l %X - %m%n log4j.appender.webservice.encoding= UTF-8 注: Log4jAppender 繼承自 AppenderSkeleton ,沒有日志文件達到特定大小,轉換到新的文件的功能 |
1.1.3.??flume agent配置
agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # Describe/configure source1 agent1.sources.source1.type = avro agent1.sources.source1.bind = agent1.sources.source1.port = 44444 ? # Describe sink1 agent1.sinks.sink1.type = FILE_ROLL agent1.sinks.sink1.sink.directory = /home/yubojie/flume/apache-flume-1.2.0/flume-out ? # Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.capacity = 1000 agent1.channels.channel1.transactionCapactiy = 100 ? # Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1 注:生成的文件的規則為每隔固定時間間隔生成一個新的文件,文件里面保存該時間段 agent 接收到的信息 |
1.2.? 分析
4.???????能夠提供可靠的數據傳輸,使用flume log4jAppender采集日志可以不在客戶機上啟動進程,而只通過修改logapppender直接把日志信息發送到采集機(參見圖一),此種情況可以保證采集機接受到數據之后的數據可靠性,但是客戶機與采集機連接失敗時候數據會丟失。改進方案是在客戶機上啟動一個agent,這樣可以保證客戶機和采集機不能連通時,當能連通是日志也被采集上來,不會發送數據的丟失(參見圖二),為了可靠性,需在客戶機上啟動進程
1.3.? 日志代碼
Log.info(“this message has DEBUG in it”); |
1.4.? 采集到的數據樣例
this message has DEBUG in itthis message has DEBUG in it |
2.??Exec source(放棄)
The problem with ExecSource and other asynchronous sources is that thesource can not guarantee that if there is a failure to put the event into theChannel the client knows about it. In such cases, the data will be lost. As afor instance, one of the most commonly requested features is the tail -F [file] -like use casewhere an application writes to a log file on disk and Flume tails the file,sending each line as an event. While this is possible, there’s an obviousproblem; what happens if the channel fills up and Flume can’t send an event?Flume has no way of indicating to the application writing the log file that itneeds to retain the log or that the event hasn’t been sent, for some reason. Ifthis doesn’t make sense, you need only know this: Your application can neverguarantee data has been received when using a unidirectional asynchronousinterface such as ExecSource! As an extension of this warning - and to becompletely clear - there is absolutely zero guarantee of event delivery whenusing this source. You have been warned.
注:即使是 agent 內部的可靠性都不能保證
2.1.? 使用說明
2.1.1.??flume agent配置
2.2.? 分析
2.3.? 采集到的數據樣例
2012/10/26 02:36:34 INFO? LogTest???? this message has DEBUG 中文 in it2012/10/26 02:40:12 INFO? LogTest???? this message has DEBUG 中文 in it |
2.4.? 日志代碼
Log.info(“this message has DEBUG 中文 in it”); |
3.?? Syslog
Passing messages using syslogprotocol doesn't work well for longer messages. ?The syslog appender forLog4j is hardcoded to linewrap around 1024 characters in order to comply withthe RFC. ?I got a sample program logging to syslog, picking it up with asyslogUdp source, with a JSON layout (to avoid new-lines in stack traces) onlyto find that anything but the smallest stack trace line-wrapped anyway. ?Ican't see a way to reliably reconstruct the stack trace once it is wrapped andsent through the flume chain.( 注:內容不確定是否 1.2 版本 )
?? Syslog TCP需要指定eventsize,默認為2500
?? Syslog UDP為不可靠傳輸,數據傳輸過程中可能出現丟失數據的情況。
3.1.? 使用說明
import ? java.io.IOException; import java.io.OutputStream ; import ? java.net.Socket; import ? java.net.UnknownHostException; ? ? public class ? SyslogTcp { ? public static void ? main(String args[]){ ??? ? ?Socket client = ? null ; ??? ? ? OutputStream ? out = null ; ??? ? try ? { ?????? ? client = ? new ? Socket( "" , 5140); ?????? ? out= client.getOutputStream();? ?????? ? ?String event = ? "<4>hello\n" ;? ?????? ? ?out.write(event.getBytes());? ?????? ? ?out.flush();? ?????? ? ?System. out .println( " 發送成功 ? " ); ??? ? } ? catch ? (UnknownHostException e) { ?????? ? // TODO ? Auto-generated catch block ?????? ? e.printStackTrace(); ??? ? } ? catch ? (IOException e) { ?????? ? // TODO ? Auto-generated catch block ?????? ? e.printStackTrace(); ??? ? } ? finally { ?????? ? try ? { ?????????? ? out.close(); ?????? ? } ? catch ? (IOException e) { ?????????? ? // TODO ? Auto-generated catch block ?????????? ? e.printStackTrace(); ?????? ? }? ?????? ? ? try ? { ?????????? ? client.close(); ?????? ? } ? catch ? (IOException e) { ?????????? ? // TODO ? Auto-generated catch block ?????????? ? e.printStackTrace(); ?????? ? } ??? ? } ??? ? ?? ? } } ? |
3.1.2.??日志接收的flume agent配置
3.2.????? 分析
需要編寫Client采集代碼,增量采集日志信息通過socket發送到flume agent;對于長數據處理不是很理想。可靠性可以參考log4j appender的方式來保證。
4.??日志過濾 Interceptor ( FLUME-1358)
4.1.? Regex FilteringInterceptor說明
This interceptor filters events selectively by interpreting the eventbody as text and matching the text against a configured regular expression. Thesupplied regular expression can be used to include events or exclude events.
Property Name |
Default |
Description |
type |
– |
The component type name has to be ? REGEX_FILTER |
regex |
”.*” |
Regular expression for matching against events |
excludeRegex |
false |
If true, regex determines events to exclude, otherwise regex determines events to include. |
4.2.? 使用說明(測試配置)
4.2.1.??日志接收的Flume agent配置
agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 ? # Describe/configure source1 agent1.sources.source1.type = ? avro agent1.sources.source1.bind = ? localhost agent1.sources.source1.port = 5140 ? ? agent1.sources.source1. interceptors ? = inter1 agent1.sources.source1. interceptors .inter1.type = REGEX_FILTER agent1.sources.source1. interceptors .inter1. regex ? = .*DEBUG.* agent1.sources.source1. interceptors .inter1.excludeRegex = false ? ? ? ? # Describe sink1 #agent1.sinks.sink1.type = ? avro #agent1.sinks.sink1.channels = channel1 #agent1.sinks.sink1. hostname ? = #agent1.sinks.sink1.port = 44444 ? agent1.sinks.sink1.type = FILE_ROLL agent1.sinks.sink1.sink.directory = E:\\file-out ? ? # Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.capacity = 1000 agent1.channels.channel1.transactionCapactiy = 100 ? # Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1 |
5.1.? 使用說明
5.2.? 可配置項
Name |
Default |
Description |
channel |
– |
? |
type |
– |
The component type name, needs to be ? hdfs |
hdfs.path |
– |
HDFS directory path (eg hdfs://namenode/flume/webdata/) |
hdfs.filePrefix |
FlumeData |
Name prefixed to files created by Flume in hdfs directory |
hdfs.rollInterval |
30 |
Number of seconds to wait before rolling current file (0 = never roll based on time interval) |
hdfs.rollSize |
1024 |
File size to trigger roll, in bytes (0: never roll based on file size) |
hdfs.rollCount |
10 |
Number of events written to file before it rolled (0 = never roll based on number of events) |
hdfs.batchSize |
1 |
number of events written to file before it flushed to HDFS |
hdfs.txnEventMax |
100 |
? |
hdfs.codeC |
– |
Compression codec. one of following : gzip, bzip2, lzo, snappy |
hdfs.fileType |
SequenceFile |
File format: currently ? SequenceFile , DataStream ? or CompressedStream (1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC |
hdfs.maxOpenFiles |
5000 |
? |
hdfs.writeFormat |
– |
“Text” or “Writable” |
hdfs.appendTimeout |
1000 |
? |
hdfs.callTimeout |
10000 |
? |
hdfs.threadsPoolSize |
10 |
Number of threads per HDFS sink for HDFS IO ops (open, write, etc.) |
hdfs.rollTimerPoolSize |
1 |
Number of threads per HDFS sink for scheduling timed file rolling |
hdfs.kerberosPrincipal |
– |
Kerberos user principal for accessing secure HDFS |
hdfs.kerberosKeytab |
– |
Kerberos keytab for accessing secure HDFS |
hdfs.round |
false |
Should the timestamp be rounded down (if true, affects all time based escape sequences except %t) |
hdfs.roundValue |
1 |
Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit ), less than current time. |
hdfs.roundUnit |
second |
The unit of the round down value - ? second , minute ? or hour . |
serializer |
Other possible options include ? AVRO_EVENT ? or the fully-qualified class name of an implementation of the EventSerializer.Builder ? interface. |
serializer.* |
? |
5.3.? Agent配置樣例
6.1.? 準備工作
6.2.? agent配置文件
# example.conf: A single-node Flume configuration????? ????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????? # Name the components on this agent???????????????? ??????????????????????????????????????????????? ???? agent1.sources = source1?????????????????????????????????????????????????????????????????????????????? agent1.sinks = sink1?????????????????????????????????????????????????????????????????????????? agent1.channels = channel1???????????????????????? ??????????????????????????????????????????????????? # Describe/configure source1??????????????????????? agent1.sources.source1.type = com.ultrapower.ultracollector.flume.source.file.FileSource???????????????????????????????????????????????? ? agent1.sources.source1.path = /home/yubojie/logs/ultraIDCPServer.log #gbk,utf-8? agent1.sources.source1.encoding = utf-8 agent1.sources.source1.onceMaxReadByte = 999 agent1.sources.source1.cacheQueueSize = 10 agent1.sources.source1.noChangeSleepTime = 1000 agent1.sources.source1.batchCommitSize = 5 agent1.sources.source1.batchWaitTime = 500???????? ??????? #agent1.sources.source1.type = avro #agent1.sources.source1.bind = localhost #agent1.sources.source1.port = 44444 ????????????????????????????????????? ???????????????????????????????????????????????? # Describe sink1??????????????????????????????????????????????????????????????????????????????????????? #agent1.sinks.sink1.type = logger?????????? #agent1.sinks.sink1.type = FILE_ROLL #agent1.sinks.sink1.sink.directory = E:/file-out #agent1.sinks.sink1.sink.fileName = a.log agent1.sinks.sink1.type = hdfs #agent1.sinks.sink1.hdfs.path = hdfs:// agent1.sinks.sink1.hdfs.path = hdfs:// agent1.sinks.sink1.hdfs.callTimeout = 20000 agent1.sinks.sink1.hdfs.fileType = DataStream #agent1.sinks.sink1.sink.rollInterval = 30???????? ??????????????? ?????????????????????????????????????????? # Use a channel which buffers events in memory???????? agent1.channels.channel1.type = memory????????????? agent1.channels.channel1.capacity = 1000??????????? agent1.channels.channel1.transactionCapactiy = 100? ?????????????????????????????????????????? ?????????????????????????????????????????????????? # Bind the source and sink to the channel?????????????????????????????????????????????????????????????? agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1??????? ???? ########################## test method ######################################## ? #########start flume agent #########? #agent -n agent1 -f .\conf\flume-conf.properties.template.file.signle ? ######### client send message ######### ? # $ bin/flume-ng avro-client -H localhost -p 44444 -F 'F:/1/log.log' |
# example.conf: A single-node Flume configuration????? ????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????? # Name the components on this agent?????????? ?????? ??????????????????????????????????????????????????? agent2.sources = source1?????????????????????????????????????????????????????????????????????????????? agent2.sinks = sink1?????????????????????????????????????????????????????????????????????????? agent2.channels = channel1???????????????????????? ??????????????????????????????????????????????????? # Describe/configure source1??????????????????????? agent2.sources.source1.type = com.ultrapower.ultracollector.flume.source.file.FileSource???????????????????????????????????????????????? ? agent2.sources.source1.path = /home/yubojie/logtest/logs/ultraIDCPServer.log #gbk,utf-8? agent2.sources.source1.encoding = utf-8 agent2.sources.source1.onceMaxReadByte = 999 agent2.sources.source1.cacheQueueSize = 10 agent2.sources.source1.noChangeSleepTime = 1000 agent2.sources.source1.batchCommitSize = 5 agent2.sources.source1.batchWaitTime = 500???????? ??????? #agent1.sources.source1.type = avro #agent1.sources.source1.bind = localhost #agent1.sources.source1.port = 44444 ????????????????????????????????????????????????????????????????????????????????????? # Describe sink1??????????????????????????????????????????? ???????????????????????????????????????????? #agent1.sinks.sink1.type = logger?????????? #agent1.sinks.sink1.type = FILE_ROLL #agent1.sinks.sink1.sink.directory = E:/file-out #agent1.sinks.sink1.sink.fileName = a.log agent2.sinks.sink1.type = hdfs #agent1.sinks.sink1.hdfs.path = hdfs:// agent2.sinks.sink1.hdfs.path = hdfs:// agent2.sinks.sink1.hdfs.callTimeout = 20000 agent2.sinks.sink1.hdfs.fileType = DataStream #agent1.sinks.sink1.sink.rollInterval = 30???????? ??????????????? ?????????????????????????????????????????? # Use a channel which buffers events in memory???????? agent2.channels.channel1.type = memory????????????? agent2.channels.channel1.capacity = 1000??????????? agent2.channels.channel1.transactionCapactiy = 100? ?????????????????????????????????????????? ?????????????????????????????????????????????????? # Bind the source and sink to the channel?????????????????????????????????????????????? ???????????????? agent2.sources.source1.channels = channel1 agent2.sinks.sink1.channel = channel1??????? ???? ########################## test method ######################################## ? #########start flume agent #########? #agent -n agent1 -f .\conf\flume-conf.properties.template.file.signle ? ######### client send message ######### ? # $ bin/flume-ng avro-client -H localhost -p 44444 -F 'F:/1/log.log' |
6.3.? 啟動命令
flume-ng agent -name agent1 -c conf -f ../conf/flume-conf.properties?? //agent1監控/home/yubojie/logs/ultraIDCPServer.log flume-ng agent -name agent2 -c conf -f ../conf/flume-conf2.properties //agent2監控/home/yubojie/logtest/logs/ultraIDCPServer.log |
6.4.? 測試結果
? ? package org.apache.flume.interceptor; ? import static org.apache.flume.interceptor.RegexFilteringInterceptor.Constants. DEFAULT_EXCLUDE_EVENTS ; import static ? org.apache.flume.interceptor.RegexFilteringInterceptor.Constants. DEFAULT_REGEX ; import static ? org.apache.flume.interceptor.RegexFilteringInterceptor.Constants. EXCLUDE_EVENTS ; import static ? org.apache.flume.interceptor.RegexFilteringInterceptor.Constants. REGEX ; ? import ? java.util.List; import ? java.util.regex.Pattern; ? import ? org.apache.flume.Context; import ? org.apache.flume.Event; import ? org.slf4j.Logger; import org.slf4j.LoggerFactory ; ? import ? com.google.common.collect.Lists; ? public class ? RegexFilteringInterceptor implements ? Interceptor { ? ? ? private static final ? Logger logger ? = LoggerFactory ????? ? . getLogger (RegexFilteringInterceptor. class ); ? ? ? private final ? Pattern regex ; ? ? private final boolean excludeEvents ; ? ? ? /** ?? ? * Only {@link RegexFilteringInterceptor.Builder} can build me ?? ? */ ? ? private ? RegexFilteringInterceptor(Pattern regex, boolean ? excludeEvents) { ??? ? this . regex ? = regex; ??? ? this . excludeEvents ? = excludeEvents; ? ? } ? ? ? @Override ? ? public void ? initialize() { ??? ? // no- op ? ? } ? ? ? ? @Override ? ? /** ?? ? * Returns the event if it passes the regular expression filter and null ?? ? * otherwise. ?? ? */ ? ? public ? Event intercept(Event event) { ??? ? // We've already ensured here that at most one of includeRegex and ??? ? // excludeRegex are defined. ? ??? ? if ? (! excludeEvents ) { ????? ? if ? ( regex .matcher( new ? String(event.getBody())).find()) { ??????? ? return ? event; ????? ? } ????? ? else ? { ??????? ? return null ; ????? ? } ??? ? } ??? ? else ? { ????? ? if ? ( regex .matcher( new ? String(event.getBody())).find()) { ??????? ? return null ; ????? ? } ????? ? else ? { ??????? ? return ? event; ????? ? } ??? ? } ? ? } ? ? ? /** ?? ? * Returns the set of events which pass filters, according to ?? ? * {@link #intercept(Event)} . ?? ? * @param events ?? ? * @return ?? ? */ ? ? @Override ? ? public ? List<Event> intercept(List<Event> events) { ??? ? List<Event> out = Lists. newArrayList (); ??? ? for ? (Event event : events) { ????? ? Event outEvent = intercept(event); ????? ? if ? (outEvent != null ) { out.add(outEvent); } ??? ? } ??? ? return ? out; ? ? } ? ? @Override ? ? public void ? close() { ??? ? // no- op ? ? } ? ? /** ?? ? * Builder which builds new instance of the StaticInterceptor. ?? ? */ ? ? public static class ? Builder implements ? Interceptor.Builder { ? ??? ? private ? Pattern regex ; ??? ? private boolean excludeEvents ; ??? ? @Override ??? ? public void ? configure(Context context) { ????? ? String regexString = context.getString( REGEX , DEFAULT_REGEX ); ????? ? regex ? = Pattern. compile (regexString); ????? ? excludeEvents ? = context.getBoolean( EXCLUDE_EVENTS , ????????? ? DEFAULT_EXCLUDE_EVENTS ); ??? ? } ??? ? @Override ??? ? public ? Interceptor build() { ????? ? logger .info(String. format ( ????????? ? "Creating RegexFilteringInterceptor: regex=%s,excludeEvents=%s" , ????????? ? regex , excludeEvents )); ????? ? return new ? RegexFilteringInterceptor( regex , excludeEvents ); ??? ? } ? ? } ? ? public static class ? Constants { ??? ? public static final ? String REGEX ? = "regex" ; ??? ? public static final ? String DEFAULT_REGEX ? = ".*" ; ??? ? public static final ? String EXCLUDE_EVENTS ? = "excludeEvents" ; ??? ? public static final boolean DEFAULT_EXCLUDE_EVENTS ? = ? false ; ? ? } } |
InterceptorType 源代碼
黃色為添加內容 package org.apache.flume.interceptor; ? public enum InterceptorType { ? ? TIMESTAMP(org.apache.flume.interceptor.TimestampInterceptor.Builder.class), ? HOST(org.apache.flume.interceptor.HostInterceptor.Builder.class), ? ? REGEX_FILTER(org.apache.flume.interceptor.RegexFilteringInterceptor.Builder.class) , ? ; ? ? private final Class<? extends Interceptor.Builder> builderClass; ? ? private InterceptorType(Class<? extends Interceptor.Builder> builderClass) { ??? this.builderClass = builderClass; ? } ? ? public Class<? extends Interceptor.Builder> getBuilderClass() { ??? return builderClass; ? } ? } |

