当前位置: 代码迷 >> 综合 >> Flume的进阶介绍和使用(Interceptor,Selectors,Processors)
  详细解决方案

Flume的进阶介绍和使用(Interceptor,Selectors,Processors)

热度:2   发布时间:2023-12-12 03:43:56.0

Flume的引入

  • 关于Flume的介绍和使用,官网已经给了比较详细的介绍。本文在这里做一个总结。
  • Flume是Apache下的一个开源的顶级项目,它是一个分布式,可扩展,高可用,高可靠的,轻量级数据收集框架,主要用来做数据的收集,聚合,和传输,相对与传统的数据导入导出框架Sqoop,它具备多方面优势,如,简单易用,Flume只需要一个简单的配置文件即可启动;功能全面,Flume在Source,Channel,Sink三个组件部分提供了很多功能,比如Source端多种类型以支持不同的数据收集需求,并可以自定义Source来满足特定的需求,Source的拦截器(Interceptor),Channel的Selector,Sink端的Processor(下文会一一介绍),并可配置压缩格式,定义文件格式等。

Flume基本概念

  • 关于Flume的基本概念,如,Agent,Source,Channel,Sink可以参照之前博客,该篇文章也有Flume 的基本使用介绍,本篇文章就不做过度阐述,这里主要拎出Flume中重要概念之一,Event。
  • Event: Event是Flume事件处理的最小单元,Flume在读取数据源时,会将一行数据(也就是遇到\r\n)包装成一个Event,它主要有俩个部分,Header和Body, Header主要是以Key,Value的形式用来记录该数据的一些冗余信息,可用来标记数据唯一信息,利用Header的信息,我们可以对数据做出一些额外的操作,如对数据进行一个简单过滤,Body则是存入真正数据的地方。

Flume架构

  • Flume本身的架构特点保证了端到端的数据一致性,数据经过Source被存入到Channel中,如果选择合适的Channel可以保证数据的零丢失,并且Channel 中的数据只有在被Sink 端消费(也就是数据传输到下一个Agent或写入到文件后)才会在Channel中删除相应数据。
  • Flume使用两个独立的事务分别负责从soucrce到channel,以及从channel到sink的事件传递。比如spooling directory source 为文件的每一行创建一个事件,一旦事务中所有的事件全部传递到channel且提交成功,那么source就将该文件标记为完成。同理,事务以类似的方式处理从channel到sink的传递过程,如果因为某种 原因使得事件无法记录,那么事务将会回滚。且所有的事件都会保持到channel中,等待重新传递。
  • Flume 官网介绍了Flume的两种架构选型,扇入和扇出模型,这两种架构需相互结合,并配合上Flume本身提供Load_balance,Failover 机制来使整体设计的架构稳定,可靠

这里写图片描述
这里写图片描述

Flume Interceptor(拦截器)

  • Flume 的拦截器,直接从字面上理解的一种作用是拦截过滤指定的数据内容,做一个简单的清洗;还有一种作用是在Source写入到Channel中时,在Event的Header中添加一些有用的信息,比如添加Timestamp,下面介绍一些Flume中常见的Interceptor。
  • Timestamp Interceptor:
    时间戳拦截器,将当前时间戳(毫秒)加入到events header中,key名字为:timestamp,值为当前时间戳。用的不是很多。比如在使用HDFS Sink时候,根据events的时间戳生成结果文件,hdfs.path = hdfs://var/tmp/event/%Y%m%d

    hdfs.filePrefix = log_%Y%m%d_%H

    会根据时间戳将数据写入相应的文件中。

    但可以用其他方式代替(设置useLocalTimeStamp = true)。

    • 参数解释

      Property Name               Default          Description
      type                          –               指定为:timestamp
      header                     timestamp          header中的key的值
      preserveExisting             false            如果timestamp存在,是否保存
      
  • Host Interceptor:
    主机名拦截器。将运行Flume agent的主机名或者IP地址加入到events header中,key名字为:host(也可自定义)。

    • 参数解释

      Property Name                Default          Description
      type                          –               指定为:host
      preserveExisting              false           如果host存在,是否保存
      useIP                         true            使用IP,否则使用hostname
      hostHeader                    host            header中的key的值
      
  • Static Interceptor:
    静态拦截器,用于在events header中加入一组静态的key和value。

    • 参数解释

      Property Name                 Default         Description
      type                           –              指定为:static
      preserveExisting               true           如果定义的key存在,是否保存
      key                            key            header中的key的值
      value                          value          header中的value的值    
      
  • UUID Interceptor:
    UUID拦截器,用于在每个events header中生成一个UUID字符串,例如:b5755073-77a9-43c1-8fad-b7a586fc1b97。生成的UUID可以在sink中读取并使用。

    • 参数解释

      Property Name                 Default          Description
      type                             –             指定为:org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
      headerName                       id            header中的key的值
      preserveExisting                true           如果UUID存在,是否保存
      prefix                           “”            UUID前缀
      
  • Regex Filtering Interceptor:
    通过正则来清洗数据。

    • 参数解释

      Property Name                  Default          Description
      type                              –             指定为:regex_filter
      regex                            ”.*”           与事件匹配的正则表达式
      excludeEvents                   false           按正则排除或匹配
      
  • Regex Extractor Interceptor:
    通过正则表达式来在header中添加指定的key,value则为正则匹配的部分

    • 参数解释

      Property Name                   Default          Description
      type                                –            指定为:regex_extractor
      regex                               –            与事件匹配的正则表达式
      serializers                         –            序列化value的方式,flume提供俩种内置方式:org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer ,org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
      serializers.<s1>.type            default         默认为default(org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer), org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer, 或者自定义class实现 org.apache.flume.interceptor.RegexExtractorInterceptorSerializer
      serializers.<s1>.name               –            key的名称
      serializers.*                       –            Serializer-specific properties
      

Flume Channel Selectors(选择器)

  • Flume Channel Selectors 官方提供了两种选择,Replicating(复制)和Multiplexing(复用),当然,我们也可自定义Selectors,Replicating(复制):同一个Event会发送给每一个Channel 中;Multiplexing(复用):同一个Event只会选择多个Channel中的一个发送,默认使用Replicating Selector
  • Replicating Channel Selector (default):

    • 参数解释

      Property Name                    Default         Description
      selector.type                    replicating     可指定类型:replicating
      selector.optional                   –            标记可选项的Channel名称
      

      注:当一个Channel被标记为可选项时,表示对该Channel写入失败时会被简单的忽略,而没被标记的Channel在写入失败时会导致Event传输停止

  • Multiplexing Channel Selector:

    • 参数解释

      Property Name                   Default           Description
      selector.type                  replicating        指定为:multiplexing
      selector.header              flume.selector.header   header中key的名称
      selector.default                     –            指定默认的Event传输的Channel
      selector.mapping.*                   –            传输到指定Channel的value属性
      

Flume Sink Processors(处理器)

  • Flume官网提供了三种Processors可供选择,用户自定义Processor暂时还没被支持,使用Flume Sink Processors我们可以做负载均衡(load balancing)和fail over切换。
  • Default Sink Processor:

    • 参数解释

      Property Name                  Default          Description
      sinks                             –             指定sink组的list,空格分隔
      processor.type                 default          processor类型default, failover or load_balance
      

      默认Sink Processor不会强制用户去创建sink group,可以采用最简单的source–channel–sink的单sink。

  • Failover Sink Processor:

  • FailoverSink Processor会通过配置维护了一个优先级列表。保证每一个有效的事件都会被处理。故障转移的工作原理是将连续失败sink分配到一个池中,在那里被分配一个冷冻期,在这个冷冻期里,这个sink不会做任何事。一旦sink成功发送一个event,sink将被还原到live 池中。在这配置中,要设置sinkgroups processor为failover,需要为所有的sink分配优先级,所有的优先级数字必须是唯一的,这个得格外注意。此外,failover time的上限可以通过maxpenalty 属性来进行设置。

    • 参数解释

      Property Name                   Default         Description
      sinks                              –            指定sink组中sink的名称,空格分隔
      processor.type                  default         指定类型为:failover
      processor.priority.<sinkName>      –            指定sink组中sink的优先权,用数字表示,数字越大级别越高,且是唯一值
      processor.maxpenalty             30000          故障sink的最大冷却周期
      
  • Load balancing Sink Processor:

  • 负载均衡片处理器提供在多个Sink之间负载平衡的能力。实现支持通过round_robin(轮询)或者random(随机)参数来实现负载分发,默认情况下使用round_robin,但可以通过配置覆盖这个默认值。还可以通过集成AbstractSinkSelector类来实现用户自己的选择机制。

    当被调用的时候,这选择器通过配置的选择规则选择下一个sink来调用。

    • 参数解释

      Property Name                   Default          Description
      processor.sinks                   –              指定sink组中sink的名称,空格分隔
      processor.type                  default          指定类型为:load_balance
      processor.backoff                false           Should failed sinks be backed off exponentially.
      processor.selector            round_robin        指定负载机制
      processor.selector.maxTimeOut    30000           Used by backoff selectors to limit exponential backoff (in milliseconds)
      
  相关解决方案