• <nav id="dstbx"></nav>

    <nav id="dstbx"></nav>
    1. <form id="dstbx"></form>
    2. <small id="dstbx"></small>

      教育行業A股IPO第一股(股票代碼 003032)

      全國咨詢/投訴熱線:400-618-4000

      Apache Flume timestamp和host攔截器使用

      更新時間:2019年11月08日14時53分 來源:傳智播客 瀏覽次數:

      一、 Flume攔截器介紹

      攔截器是簡單的插件式組件,設置在source和channel之間。source接收到的時間,在寫入channel之前,攔截器都可以進行轉換或者刪除這些事件。每個攔截器只處理同一個source接收到的事件。可以自定義攔截器。

      flume內置了很多攔截器,并且會定期的添加一些攔截器,下面我們學習flume內置的,兩個經常使用的攔截器。

      1. Timestamp Interceptor(時間戳攔截器)

      flume中一個最經常使用的攔截器 ,該攔截器的作用是將時間戳插入到flume的事件報頭中。如果不使用任何攔截器,flume接受到的只有message。時間戳攔截器的配置。

      參數默認值描述type類型名稱timestamp,也可以使用類名的全路徑。preserveExisting false 如果設置為true,若事件中報頭已經存在,不會替換時間戳報頭的值。

      a1.sources.r1.interceptors = timestamp
      
      a1.sources.r1.interceptors.timestamp.type=timestamp
      
      a1.sources.r1.interceptors.timestamp.preserveExisting=false
      
      

      2. Host Interceptor(主機攔截器)

      主機攔截器插入服務器的ip地址或者主機名,agent將這些內容插入到事件的報頭中。時間報頭中的key使用hostHeader配置,默認是host。主機攔截器的配置參數 默認值 描述 type 類型名稱host hostHeader host 事件投的key useIP true 如果設置為false,host鍵插入主機名 preserveExisting false 如果設置為true,若事件中報頭已經存在,不會替換host報頭的值

      a1.sources.r1.interceptors = host
      
      a1.sources.r1.interceptors.host.type=host
      
      a1.sources.r1.interceptors.host.useIP=false
      
      a1.sources.r1.interceptors.timestamp.preserveExisting=true
      
      

      二、 業務需求

      使用flume內置攔截器完成如下需求:

      Flume 攔截器1

      1. agent配置

      
      # 03 timestamp and host interceptors work before source
      a1.sources.r1.interceptors = i1
      i2  # 兩個interceptor串聯,依次作用于event
      a1.sources.r1.interceptors.i1.type = timestamp
      a1.sources.r1.interceptors.i1.preserveExisting = false
      
      a1.sources.r1.interceptors.i2.type = host
      # flume event的頭部將添加 “hostname”:實際主機名
      a1.sources.r1.interceptors.i2.hostHeader = hostname  # 指定key,value將填充為flume agent所在節點的主機名
      a1.sources.r1.interceptors.i2.useIP = false  # IP和主機名,二選一即可
      
      # 04 hdfs sink
      a1.sinks.k1.type = hdfs
      a1.sinks.k1.hdfs.path = hdfs: // master:9000 / flume / % Y - % m - % d /  
      # hdfs sink將根據event header中的時間戳進行替換
      # hostHeader的值保持一致,hdfs sink將提取eventkeyhostnmae的值,基于該值創建文件名前綴
      a1.sinks.k1.hdfs.filePrefix = % {hostname}  # hdfs sink將根據event header中的hostnmae對應的value進行替換
      a1.sinks.k1.hdfs.fileType = DataStream
      a1.sinks.k1.hdfs.writeFormat = Text
      a1.sinks.k1.hdfs.rollInterval = 0
      a1.sinks.k1.hdfs.rollCount = 10
      a1.sinks.k1.hdfs.rollSize = 1024000
      
      # channel,memory
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100
      
      # bind source,sink to channel
      a1.sinks.k1.channel = c1
      a1.sources.r1.channels = c1
      
      

      三、 驗證攔截器效果

      1. 驗證思路

      1)先將interceptor作用后的event,通過logger sink打印到console,驗證header是否正常添加

      2)修改sink為hdfs, 觀察目錄和文件的名稱是否能夠按照預期創建(時間戳-目錄,hostname-文件前綴)

      2. 驗證過程

      1)發送header為空的http請求,logger sink打印event到終端,觀察event header中是否被添加了timestamp以及hostname

      
      # 01 define agent name, source/sink/channel name
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
      
      # 02 source,http,jsonhandler
      a1.sources.r1.type = http
      a1.sources.r1.bind = node - 1
      a1.sources.r1.port = 8888
      a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler
      
      # 03 timestamp and host interceptors work before source
      a1.sources.r1.interceptors = i1
      i2
      a1.sources.r1.interceptors.i1.type = timestamp
      a1.sources.r1.interceptors.i1.preserveExisting = false
      
      a1.sources.r1.interceptors.i2.type = host
      a1.sources.r1.interceptors.i2.hostHeader = hostname
      a1.sources.r1.interceptors.i2.useIP = false
      
      # 04 hdfs sink
      a1.sinks.k1.type = logger
      
      # channel,memory
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100
      
      # bind source,sink to channel 
      a1.sinks.k1.channel = c1
      a1.sources.r1.channels = c1
      
      

      啟動flume agent

      
      bin/flume-ng agent -c ./conf -f ./conf/http_sink_logger_source.conf -n a1 -Dflume.root.logger=INFO,console
      
      

      發送請求測試:

      
      curl -X POST -d '[{"header":{},"body":"time-host-interceptor001"}]' http://node-1:8888
      
      

      可以看到終端輸出的event header中已經有了攔截器的信息

      Flume 攔截器2

      修改sink為hdfs, 觀察HDFS的目錄名(時間戳)和文件前綴(hostnme)

      flume攔截器3


      目錄名被正常替換(基于event header中的時間戳)

      flume攔截器4

      文件前綴被正常替換(基于event header中的hostname:實際主機名)

      flume 攔截器5

      文件內容被寫入為event的body

      Flume攔截器6

      本文來自:傳智播客大數據培訓學院

      神马影院我不卡,农村丰满肥熟老妇女,午夜电影网,2018日日摸夜夜添夜夜添 网站地图