发布时间:2023-02-13 13:30
说白了flume就是一个采集数据的软件,是cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的软件;
flume的核心就是把数据从数据源(source)收集过来,为了保证传输的成功性,会先缓存数据(channel),待数据到达目的地(sink)的时候,再删除自己缓存的数据;
flume支持定制各类数据发送方,用于手机各类型的数据,同时也可以支持定制各种数据的接收方,用于最终存储数据
每一个agent相当于一个数据邮递员,将一些数据从一个地方运输到另外一个地方。
source:采集源,用于跟数据对接,以获取数据
sink:下层地,将数据发送到下一个agent或者是发往最终的目的地
channel:agent内部的一个传输数据的通道 用于连接source和sink,将数据从source传递到sink;
在整个数据的传输过程中,流动的是event,将数据进行封装,并携带头信息。
一个完整的event包括:event headers、event body、event信息、其中event信息就是flume手机到的日记记录。
上传安装包到数据源所在节点上
然后解压 tar -zxvf apache-flume-1.8.0-bin.tar.gz -C 安装的目录
然后进入flume的目录,修改conf下的flume-env.sh,在里面配置JAVA_HOME
执行: vi netcat-logger.conf
文件内容如下:netcat-logger.conf
# 定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述和配置source组件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 描述和配置sink组件:k1
a1.sinks.k1.type = logger
# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 描述和配置source channel sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2) 启动agent去采集数据
bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console
-c conf 指定flume自身的配置文件所在目录
-f conf/netcat-logger.con 指定我们所描述的采集方案
-n a1 指定我们这个agent的名字
3) 测试
先要往agent采集监听的端口上发送数据,让agent有数据可采。
随便在一个能跟agent节点联网的机器上:
telnet anget-hostname port (telnet localhost 44444)
采集需求:服务器的某特定目录下,会不断产生新的文件,每当有新的文件产生时,就需要把文件采集到的hdfs
采集三大要素
source:监控目录--spooldir
sink:hdfs sink
channel:可以用file channel也可以用内存channel
步骤:
1)配置文件在flume中的conf中
2)开启命令 bin/flume-ng agent -c conf -f conf/spooldir-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
3)往监测的目录中加入数据 此时就会读取加入的数据发送到hdfs上
注意:
1) 在收集文件夹的时候, 不允许在文件夹下面传递相同文件名的文件, 否则flume将会直接罢工不干活
2) 启动之前, 目录是要存在的
解决:将同名称删除, 然后重启agent才可以使用
采集需求: 比如业务系统使用log4j生成的日志, 日志内容不断增加, 需要把追加到日志文件中的数据实时采集到hdfs中
采集的三大要素
source:控制文件内容更新 -- exec : 通过linux的命令获取对应数据 (常用于监控文件)
sink:hdfs sink
channel:内存channel
步骤:
1)配置监测文件到conf目录下
2)启动命令 bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
3)模拟数据的生成 向被检测的文件中写入动态的数据 这样就会被检测到然后按照指定的要求把数据写入到hdfs中 while true; do echo "test test....." >> /export/flumedata/hhh.log;sleep 0.5;done
Q: 如何知道这个文件产生了新的数据????
tail -f 文件路径
**参数解析:**
- **rollInterval**
默认值:30
hdfs sink间隔多长将临时文件滚动成最终目标文件,单位:秒;
如果设置成0,则表示不根据时间来滚动文件;
注:滚动(roll)指的是,hdfs sink将临时文件重命名成最终目标文件,并新打开一个临时文件来写入数据;
- **rollSize**
默认值:1024
当临时文件达到该大小(单位:bytes)时,滚动成目标文件;
如果设置成0,则表示不根据临时文件大小来滚动文件;
- **rollCount**
默认值:10
当events数据达到该数量时候,将临时文件滚动成目标文件;
如果设置成0,则表示不根据events数据来滚动文件;
- **round**
默认值:false
是否启用时间上的“舍弃”,这里的“舍弃”,类似于“四舍五入”。
- **roundValue**
默认值:1
时间上进行“舍弃”的值;
- **roundUnit**
默认值:seconds
时间上进行“舍弃”的单位,包含:second,minute,hour
负载均衡(load-balance)是用于解决一台机器(一个进程)无法解决所有请求而产生的一种算法。Load balancing Sink Processor能够实现load balance功能,如下图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上
当一个节点出现了大量的数据, 导致这个节点执行效率会很低,将这个节点扩展多份 来共同承担处理的任务
注意: 如果有多个agent, 在启动的时候: 先启动那些远离数据源的agent
我这里有三台服务器
-- 第一台的配置:
#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2
#set gruop
agent1.sinkgroups = g1
#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /export/flumedata/123.log
# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = hadoop02
agent1.sinks.k1.port = 52020
# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = hadoop03
agent1.sinks.k2.port = 52020
#set sink group
agent1.sinkgroups.g1.sinks = k1 k2
#负载均衡的配置
agent1.sinkgroups.g1.processor.type = load_balance
# 是否开启黑名单机制
agent1.sinkgroups.g1.processor.backoff = true
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000
-- 第二台
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = hadoop02
a1.sources.r1.port = 52020
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
-- 第三台
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = hadoop03
a1.sources.r1.port = 52020
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
bin/flume-ng agent -c conf -f conf/avro-logger.conf -n a1 -Dflume.root.logger=INFO,console
当一个节点出现了宕机的问题后, 将备份节点更改为active状态, 提供使用即可保证服务器集群不会出现宕机的风险通过配置sink的优先级, 来决定sink的执行的顺序
-- 第一台
#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2
#set gruop
agent1.sinkgroups = g1
#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /export/flumedata/456.log
# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = node02
agent1.sinks.k1.port = 52020
# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = node03
agent1.sinks.k2.port = 52020
#set sink group
agent1.sinkgroups.g1.sinks = k1 k2
#set failover
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000