flume实例【一】监听目录日志上传到其他服务器


flume-ng简介

请参考官方文档:http://flume.apache.org/FlumeUserGuide.html

上传实例

需求说明:需要监控一个目录,并自动上传到服务器,且需要在传输过程中进行加密。 整体方案:n个client-agent -->server-agent
client-agent(客户端)配置文件:

a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    #source
    a1.sources.r1.type = spooldir
    a1.sources.r1.channels = c1
    a1.sources.r1.basenameHeader = true
    #a1.sources.r1.ignorePattern = .+\.log$
    a1.sources.r1.bufferMaxLineLength = 1048576
    a1.sources.r1.batchSize = 5000
    #拦截器
    a1.sources.r1.interceptors = i1 i2 i3
    a1.sources.r1.interceptors.i1.type = static
    a1.sources.r1.interceptors.i1.key = id
    a1.sources.r1.interceptors.i2.type = static
    a1.sources.r1.interceptors.i2.key = key
    #自定义拦截器,可以对所传输的event进行加密等操作
    a1.sources.r1.interceptors.i3.type = com.landray.behavior.interceptor.BehaviorClientSerurityHDFSInterceptor$Builder
    #file channel
    a1.channels = c1
    a1.channels.c1.type = file
    #检查点目录
    a1.channels.c1.checkpointDir = ./checkpoint
    a1.channels.c1.dataDirs = ./data
    #sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.channel = c1
    a1.sinks.k1.compression-type = deflate
    a1.sinks.k1.compression-leve = 9
    a1.sinks.k1.batch-size = 5000
    a1.sinks.k1.hostname = http://test.com.cn
    a1.sinks.k1.port = 5281
    a1.sinks.k1.request-timeout = 20000
    #user define
    #需要上传的日志目录
    a1.sources.r1.spoolDir =D:/flume_tes/source
    #静态拦截器,给event header中添加属性
    #客户唯一ID
    a1.sources.r1.interceptors.i1.value = customerid
    #秘钥
    a1.sources.r1.interceptors.i2.value = key
    

server-agent(服务端)配置文件:

a2.channels = c2
    a2.sinks = k2
    #source
    a2.sources.r2.type = avro
    a2.sources.r2.channels = c2
    a2.sources.r2.compression-type = deflate
    a2.sources.r2.bind = localhost
    a2.sources.r2.port = 5281
    a2.sources.r2.interceptors = i1
    #拦截器,对传输的event进行解密
    a2.sources.r2.interceptors.i1.type = com.landray.behavior.interceptor.BehaviorServerSerurityInterceptor$Builder
    a2.channels = c2
    a2.channels.c2.type = file
    a2.channels.c2.checkpointDir = ./checkpoint
    a2.channels.c2.dataDirs = ./data
    a2.channels.c2.transactionCapacity = 20000
    #a2.channels.c2.type = memory
    #a2.channels.c2.capacity = 1000000
    #a2.channels.c2.transactionCapacity = 20000
    #a2.channels.c2.byteCapacityBufferPercentage = 20
    #default 80%
    #a2.channels.c2.byteCapacity = 800000
    #自定义sink按照自己的文件名以及格式进行输出
    a2.sinks.k2.type = com.landray.behavior.sink.BehaviorRollingFileSink
    a2.sinks.k2.channel = c2
    #no check
    a2.sinks.k2.sink.rollInterval = 0
    a2.sinks.k2.sink.batchSize = 20000
    #user define
    #windows,监控的目录
    a2.sinks.k2.sink.directory = D:/logs
    #linux,监控文件的目录
    #a2.sinks.k2.sink.directory = /home/nemo/logs
    

linux下启动的脚本

   #!/bin/sh   
    JAVA_OPTS=-Xmx1024m
    $JAVA_HOME/bin/java $JAVA_OPTS
    -Dlog4j.configuration=file:./conf/log4j.properties
    -cp "./lib/*" org.apache.flume.node.Application
    --conf-file ./target.conf --name a2
    #-Dlog4j指定log4j日志文件,可以自定义log输出
    #-cp 引用jar包
    #--conf-file 指定agent配置文件

windows下启动的

    @echo on  
set FLUME_HOME=%~dp0 echo %~dp0 rem 配置JAVA环境变量 set JAVA_HOME=%FLUME_HOME%..\jdk1.6 set PATH=%JAVA_HOME%\bin echo %JAVA_HOME% java -version java %JAVA_OPTS% -Dlog4j.configuration=file:./conf/log4j.properties -cp .;./lib/* org.apache.flume.node.Application --conf-file ./target_hdfs.conf --name a2 PAUSE

整个项目的目录如下图

nemotan /
Published under (CC) BY-NC-SA in categories flume  tagged with flume