Class: Fluent::Plugin::ArangoHttpApiOutput

Inherits:
Output
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_arango-http-api.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



49
50
51
# File 'lib/fluent/plugin/out_arango-http-api.rb', line 49

def configure(conf)
  super
end

#format(tag, time, record) ⇒ Object

Override ‘#format` if you want to customize how Fluentd stores events. Read the section “How to Customize the Serialization Format for Chunks” for details.



146
147
148
149
150
151
152
153
# File 'lib/fluent/plugin/out_arango-http-api.rb', line 146

def format(tag, time, record)
  # 設定値に応じてタグとタイムを入れる
  record[@tag_key] = tag if @include_tag_key
  record[@time_key] = @timef.format(time) if @include_time_key
  
  # データをMessagePack形式で返す
  [tag, time, record].to_msgpack
end

#shutdownObject

This method is called when shutting down.



108
109
110
# File 'lib/fluent/plugin/out_arango-http-api.rb', line 108

def shutdown
  super
end

#startObject

This method is called before starting.



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/fluent/plugin/out_arango-http-api.rb', line 54

def start
  super

  # headerは共通なのでここで作成
  @headers = { 
    "Content-Type" => "application/json",
    "Authorization" => "Basic " + Base64.encode64("#{@user}:#{@password}").chomp,
  }

  # Collectionがなければ作成する
  uri = URI.parse("http://#{@server}:#{@port}/_db/#{@database}/_api/collection/#{@collection}")
  http = Net::HTTP.new(uri.host, uri.port)
  http.use_ssl = uri.scheme === "https"
  
  response = http.get(uri.path, @headers)
  
  response.code # status code
  response.body # response body
  
  log.debug 'collection check response', code: response.code, body: response.body

  # 404の場合は作成する
  if response.code == '404'
    # Collectionがないので作成する。ディスクに同期的に書き込まれるのを待つ設定にする
    params = {
      "name" => @collection,
      "waitForSync" => true,
    }
    response = http.post(uri.path, params.to_json, @headers)
  
    log.debug 'collection create response', code: response.code, body: response.body

    # インデックスフィールドが指定されていたら作成する
    unless @index_fields.empty?
      uri = URI.parse("http://#{@server}:#{@port}/_db/#{@database}/_api/index")
      uri.query = URI.encode_www_form({collection: @collection})
      http = Net::HTTP.new(uri.host, uri.port)
      http.use_ssl = uri.scheme === "https"
      
      params = {
        "type" => @index_type,
        "fields" => @index_fields,
        "unique" => @index_unique,
        "sparse" => @index_sparse,
        "name" => @index_name,
      }
      response = http.post(uri.request_uri, params.to_json, @headers)

      log.debug 'index create response', code: response.code, body: response.body
    end
  end
end

#write(chunk) ⇒ Object

Sync Buffered Output ############################## Implement ‘write()` if your plugin uses normal buffer. Read “Sync Buffered Output” for details.



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/fluent/plugin/out_arango-http-api.rb', line 116

def write(chunk)
  log.debug 'writing data to arangodb', chunk_id: dump_unique_id_hex(chunk.unique_id)
  
  # ArangoDBには配列で複数行送ることができるようなので配列で送るように配列データに入れていく
  records = []
  chunk.open do |io|
    MessagePack::Unpacker.new(io).each do |tag, time, record|
      records << record
    end
  end
  
  # ArangoDBのCreateAPIを叩く
  uri = URI.parse("http://#{@server}:#{@port}/_db/#{@database}/_api/document/#{@collection}")
  http = Net::HTTP.new(uri.host, uri.port)
  http.use_ssl = uri.scheme === "https"
  
  params = records
  response = http.post(uri.path, params.to_json, @headers)
  
  response.code # status code
  response.body # response body

  # メモ:codeが201でも重複エラーの時がある
  
  log.debug 'document create response', code: response.code, body: response.body
end