Hadoop生态圈---flume

发布时间:2023-02-13 13:30

一、Flume基本介绍

1.1 什么是flume

      说白了flume就是一个采集数据的软件,是cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的软件;

      flume的核心就是把数据从数据源(source)收集过来,为了保证传输的成功性,会先缓存数据(channel),待数据到达目的地(sink)的时候,再删除自己缓存的数据;

      flume支持定制各类数据发送方,用于手机各类型的数据,同时也可以支持定制各种数据的接收方,用于最终存储数据

1.2 运行机制

             Hadoop生态圈---flume_第1张图片

每一个agent相当于一个数据邮递员,将一些数据从一个地方运输到另外一个地方。

source:采集源,用于跟数据对接,以获取数据

sink:下层地,将数据发送到下一个agent或者是发往最终的目的地

channel:agent内部的一个传输数据的通道 用于连接source和sink,将数据从source传递到sink;

在整个数据的传输过程中,流动的是event,将数据进行封装,并携带头信息。

一个完整的event包括:event headers、event body、event信息、其中event信息就是flume手机到的日记记录。

1.3 组件

 

Hadoop生态圈---flume_第2张图片

二、 flume安装部署

        2.1安装

上传安装包到数据源所在节点上
然后解压  tar -zxvf apache-flume-1.8.0-bin.tar.gz -C 安装的目录
然后进入flume的目录,修改conf下的flume-env.sh,在里面配置JAVA_HOME

        2.2测试环境是否正常

执行: vi   netcat-logger.conf

文件内容如下:netcat-logger.conf

# 定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述和配置source组件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 描述和配置sink组件:k1
a1.sinks.k1.type = logger

# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 描述和配置source  channel   sink之间的连接关系



a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

2) 启动agent去采集数据
bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console


    -c conf   指定flume自身的配置文件所在目录

​	-f conf/netcat-logger.con  指定我们所描述的采集方案

​	-n a1  指定我们这个agent的名字


3) 测试

先要往agent采集监听的端口上发送数据,让agent有数据可采。
随便在一个能跟agent节点联网的机器上:
telnet anget-hostname  port   (telnet localhost 44444)

三、flume采集案例

3.1 采集目录到hdfs

              采集需求:服务器的某特定目录下,会不断产生新的文件,每当有新的文件产生时,就需要把文件采集到的hdfs

              采集三大要素

              source:监控目录--spooldir

              sink:hdfs sink

              channel:可以用file channel也可以用内存channel

              步骤:

                 1)配置文件在flume中的conf中

                 2)开启命令    bin/flume-ng agent -c conf -f conf/spooldir-hdfs.conf -n a1  -Dflume.root.logger=INFO,console

                 3)往监测的目录中加入数据  此时就会读取加入的数据发送到hdfs上

          注意: 
                1) 在收集文件夹的时候, 不允许在文件夹下面传递相同文件名的文件, 否则flume将会直接罢工不干活
                2) 启动之前, 目录是要存在的
                解决:将同名称删除, 然后重启agent才可以使用

3.2 采集文件到hdfs

                采集需求: 比如业务系统使用log4j生成的日志, 日志内容不断增加, 需要把追加到日志文件中的数据实时采集到hdfs中

                采集的三大要素

                source:控制文件内容更新 --  exec :  通过linux的命令获取对应数据 (常用于监控文件)

                sink:hdfs sink

                channel:内存channel 

                步骤:

                1)配置监测文件到conf目录下

                2)启动命令   bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1  -Dflume.root.logger=INFO,console

                3)模拟数据的生成 向被检测的文件中写入动态的数据 这样就会被检测到然后按照指定的要求把数据写入到hdfs中   while true; do echo "test test....." >> /export/flumedata/hhh.log;sleep 0.5;done

                Q: 如何知道这个文件产生了新的数据????
                      tail -f  文件路径

3.3 参数解析

**参数解析:**

- **rollInterval**

  默认值:30

  hdfs sink间隔多长将临时文件滚动成最终目标文件,单位:秒;

  如果设置成0,则表示不根据时间来滚动文件;

  注:滚动(roll)指的是,hdfs sink将临时文件重命名成最终目标文件,并新打开一个临时文件来写入数据;

- **rollSize**

  默认值:1024

  当临时文件达到该大小(单位:bytes)时,滚动成目标文件;

  如果设置成0,则表示不根据临时文件大小来滚动文件;

- **rollCount**

  默认值:10

  当events数据达到该数量时候,将临时文件滚动成目标文件;

  如果设置成0,则表示不根据events数据来滚动文件;

- **round**

  默认值:false

  是否启用时间上的“舍弃”,这里的“舍弃”,类似于“四舍五入”。

- **roundValue**

  默认值:1

  时间上进行“舍弃”的值;

- **roundUnit**

  默认值:seconds

  时间上进行“舍弃”的单位,包含:second,minute,hour

四、负载均衡

           负载均衡(load-balance)是用于解决一台机器(一个进程)无法解决所有请求而产生的一种算法。Load balancing Sink Processor能够实现load balance功能,如下图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上

Hadoop生态圈---flume_第3张图片

        当一个节点出现了大量的数据, 导致这个节点执行效率会很低,将这个节点扩展多份 来共同承担处理的任务 
        注意: 如果有多个agent, 在启动的时候:  先启动那些远离数据源的agent

我这里有三台服务器

-- 第一台的配置:
#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2

#set gruop
agent1.sinkgroups = g1

#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100

agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /export/flumedata/123.log

# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = hadoop02

agent1.sinks.k1.port = 52020

# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = hadoop03
agent1.sinks.k2.port = 52020

#set sink group
agent1.sinkgroups.g1.sinks = k1 k2

#负载均衡的配置
agent1.sinkgroups.g1.processor.type = load_balance
# 是否开启黑名单机制
agent1.sinkgroups.g1.processor.backoff = true
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000

-- 第二台
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = hadoop02
a1.sources.r1.port = 52020

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

-- 第三台

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = hadoop03
a1.sources.r1.port = 52020

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1




bin/flume-ng agent -c conf -f conf/avro-logger.conf -n a1 -Dflume.root.logger=INFO,console

五、容错机制

当一个节点出现了宕机的问题后, 将备份节点更改为active状态, 提供使用即可保证服务器集群不会出现宕机的风险通过配置sink的优先级, 来决定sink的执行的顺序

-- 第一台
#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2

#set gruop
agent1.sinkgroups = g1

#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100

agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /export/flumedata/456.log

# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = node02
agent1.sinks.k1.port = 52020

# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = node03
agent1.sinks.k2.port = 52020

#set sink group
agent1.sinkgroups.g1.sinks = k1 k2

#set failover
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000


 

ItVuer - 免责声明 - 关于我们 - 联系我们

本网站信息来源于互联网,如有侵权请联系:561261067@qq.com

桂ICP备16001015号