fluent-plugin-datahub

DataHub基本介绍

DataHub服务是阿里云提供的流式数据处理(Streaming Data)服务,它提供流式数据的发布 (Publish)和订阅 (Subscribe)的功能,让您可以轻松构建基于流式数据的分析和应用。DataHub服务可以对各种移动设备,应用软件,网站服务,传感器等产生的大量流式数据进行持续不断的采集,存储和处理。用户可以编写应用程序或者使用流计算引擎来处理写入到DataHub的流式数据比如实时web访问日志、应用日志、各种事件等,并产出各种实时的数据处理结果比如实时图表、报警信息、实时统计等。

DataHub服务基于阿里云自研的飞天平台,具有高可用,低延迟,高可扩展,高吞吐的特点。DataHub与阿里云流计算引擎StreamCompute无缝连接,用户可以轻松使用SQL进行流数据分析。

DataHub服务也提供流式数据归档的功能,支持流式数据归档进入MaxCompute(原ODPS)。

环境要求

使用此插件,需要具备如下环境:

  1. Ruby 2.1.0 或更新
  2. Gem 2.4.5 或更新
  3. Fluentd-0.12 或更新 (Home Page)
  4. Ruby-devel

安装部署

安装部署Fluentd可以选择以下两种方式之一。

  1. 一键安装包适用于第一次安装Ruby&Fluentd环境的用户或局域网用户,一键安装包包含了所需的Ruby环境以及Fluentd。目前一键安装包仅支持Linux环境。
  2. 通过网络安装适用于对Ruby有了解的用户,需要提前确认Ruby版本,若低于2.1.0则需要升级或安装更高级的Ruby环境,然后通过RubyGem安装Fluentd。

注:

  • RubyGem源建议更改为https://ruby.taobao.org/
  • 局域网环境安装可以通过本地安装Gem文件 gem install --local fluent-plugin-datahub-0.0.1.gem

安装方式一:一键安装包安装

  1. 下载解压 fluentd-with-datahub-0.12.23.tar.gz
  2. 可以修改install.sh中$DIR为你想安装ruby的路径,默认会安装在当前路径下面
  3. 执行如下命令,提示“Success”表示安装成功 bash install.sh
  4. fluentd程序会被安装在当前目录的bin目录下面

安装方式二:通过网络安装

  1. Ruby安装(已经存在Ruby 2.1.0以上环境可忽略此步骤): wget https://cache.ruby-lang.org/pub/ruby/2.3/ruby-2.3.0.tar.gz tar xzvf ruby-2.3.0.tar.gz cd ruby-2.3.0 ./configure --prefix=DIR make make install 2 Fluentd以及插件安装 $ gem install fluent-plugin-datahub

插件使用示例

示例一 上传csv文件中的数据

配置

<source>
  @type tail
  path ${DIR}/csv_sample.csv
  tag test1
  format csv
  keys id,name,gender,salary,my_time
</source>
<match test1>
  @type datahub
  access_id 
  access_key 
  endpoint 
  project_name test_project
  topic_name fluentd_out_7
  # shard_id 6
  column_names ["id", "name", "gender", "salary", "my_time"]
  flush_interval 10s
  dirty_data_continue true
  dirty_data_file ${DIR}/dirty.data
  retry_times 3
</match>

示例二 上传日志文件中的数据

配置

source>
  @type tail
  path ${DIR}/log_sample.log
  tag test
  format /(?<request_time>\d\d:\d\d:\d\d.\d+)\s+\[(?<thread_id>[\w\-]+)\]\s+(?<log_level>\w+)\s+(?<class>\w+)\s+-\s+\[(?<request_id>\w+)\]\s+(?<detail>.+)/
</source>
<match test>
  @type datahub
  access_id yourAccessId
  access_key yourAccessKey
  endpoint yourEndpoint
  project_name test_project
  topic_name datahub_fluentd_out_1
  column_names ["thread_id", "log_level", "class"]
</match>

1、source标签中的keys为源数据,会根据key对应fields中字段<br> 2、match标签中的column_names为要写入datahub的字段 3、具体数据样例可参见gem包中的sample文件

参数说明

access_id :阿里云access_id. access_key :阿里云access key. endpoint :DataHub Endpoint project_name :datahub project name topic_name :datahub topic name retry_times :重试次数, 默认1 retry_interval :重试周期,下一次重试的间隔,单位为秒, 默认3 column_names :提交的列名,用户可以配置topic的列,采集部分列或者全部列,默认为空数组,表示按照topic的顺序及全字段提交,另外:列的配置不用保序,但是要求该字段在topic的schema中存在 source_keys :指定源头采集的keys, record 按照这些keys 获取数据, 写入datahub, 默认空数组, 此时record使用column_names 获取数据, 写入datahub dirty_data_continue :当出现脏数据时,是否继续写入,当开启该开关,必须指定@dirty_data_file文件 dirty_data_file :脏数据文件名称,当数据文件名称,在@dirty_data_continue开启的情况下,需要指定该值,特别注意:脏数据文件将被分割成两个部分.part1和.part2,part1作为更早的脏数据,part2作为更新的数据 shard_id :写入指定的 shard_id,默认轮询发送 shard_keys :按照指定字段的值计算hash,依据于该hash值落某个shard retry_limit :fluentd自带的 retry次数, 由于可能导致数据重写,该参数默认设置为0 put_data_batch_size :多少条数据 写一次datahub, 默认100条,请不要超出1000条。 data_encoding :默认使用源数据标示的encode方式,根据string.encoding获取,如果需要指定源数据编码方式,请设置该值,支持的类型:"US-ASCII", "ASCII-8BIT", "UTF-8",