Class: Fluent::HoopOutput

Inherits:
TimeSlicedOutput
  • Object
show all
Includes:
FluentExt::PlainTextFormatterMixin
Defined in:
lib/fluent/plugin/out_hoop.rb

Instance Attribute Summary

Attributes included from FluentExt::PlainTextFormatterMixin

#add_newline, #default_tag, #field_separator, #output_data_type, #output_include_tag, #output_include_time, #remove_prefix

Instance Method Summary collapse

Methods included from FluentExt::PlainTextFormatterMixin

#stringify_record

Constructor Details

#initializeHoopOutput

Returns a new instance of HoopOutput.



223
224
225
226
227
# File 'lib/fluent/plugin/out_hoop.rb', line 223

def initialize
  super
  require 'net/http'
  require 'time'
end

Instance Method Details

#configure(conf) ⇒ Object



229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/fluent/plugin/out_hoop.rb', line 229

def configure(conf)
  if conf['path']
    if conf['path'].index('%S')
      conf['time_slice_format'] = '%Y%m%d%H%M%S'
    elsif conf['path'].index('%M')
      conf['time_slice_format'] = '%Y%m%d%H%M'
    elsif conf['path'].index('%H')
      conf['time_slice_format'] = '%Y%m%d%H'
    end
  end

  super

  unless /\A([a-zA-Z0-9][-a-zA-Z0-9.]*):(\d+)\Z/ =~ @hoop_server
    raise Fluent::ConfigError, "Invalid config value on hoop_server: '#{@hoop_server}', needs SERVER_NAME:PORT"
  end
  @host = $1
  @port = $2.to_i
  unless @path.index('/') == 0
    raise Fluent::ConfigError, "Path on hdfs MUST starts with '/', but '#{@path}'"
  end
  @conn = nil
  @header = {'Content-Type' => 'application/octet-stream'}

  @f_separator = case @field_separator
                 when 'SPACE' then ' '
                 when 'COMMA' then ','
                 else "\t"
                 end
end

#format(tag, time, record) ⇒ Object



289
290
291
292
# File 'lib/fluent/plugin/out_hoop.rb', line 289

def format(tag, time, record)
  time_str = @timef.format(time)
  time_str + @f_separator + tag + @f_separator + record_to_string(record) + @line_end
end

#path_format(chunk_key) ⇒ Object



294
295
296
# File 'lib/fluent/plugin/out_hoop.rb', line 294

def path_format(chunk_key)
  Time.strptime(chunk_key, @time_slice_format).strftime(@path)
end

#record_to_string(record) ⇒ Object



285
286
287
# File 'lib/fluent/plugin/out_hoop.rb', line 285

def record_to_string(record)
  record.to_json
end

#send_data(path, data, retries = 0) ⇒ Object



298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
# File 'lib/fluent/plugin/out_hoop.rb', line 298

def send_data(path, data, retries=0)
  conn = Net::HTTP.start(@host, @port)
  conn.read_timeout = 5
  res = conn.request_put(path + "?op=append", data, @authorized_header)
  if res.code == '401'
    res = conn.request_get("/?op=status&user.name=#{@username}")
    if res.code.to_i < 300 and res['Set-Cookie']
      @authorized_header = {'Cookie' => res['Set-Cookie'].split(';')[0], 'Content-Type' => 'application/octet-stream'}
    else
      $log.error "Failed to update authorized cookie, code: #{res.code}, message: #{res.body}"
      raise Fluent::ConfigError, "Failed to update authorized cookie, code: #{res.code}, message: #{res.body}"
    end
    res = conn.request_put(path + "?op=append", data, @authorized_header)
  end
  if res.code == '404'
    res = conn.request_post(path + "?op=create&overwrite=false", data, @authorized_header)
  end
  if res.code == '500'
    if retries >= 3
      raise StandardError, "failed to send_data with retry 3 times InternalServerError"
    end
    sleep 0.3 # yes, this is a magic number
    res = send_data(path, data, retries + 1)
  end
  conn.finish
  if res.code != '200' and res.code != '201'
    $log.warn "failed to write data to path: #{path}, code: #{res.code} #{res.message}"
  end
  res
end

#shutdownObject



281
282
283
# File 'lib/fluent/plugin/out_hoop.rb', line 281

def shutdown
  super
end

#startObject



260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/fluent/plugin/out_hoop.rb', line 260

def start
  super

  # okey, net/http has reconnect feature. see test_out_hoop_reconnect.rb
  conn = Net::HTTP.start(@host, @port)
  begin
    res = conn.request_get("/?op=status&user.name=#{@username}")
    if res.code.to_i < 300 and res['Set-Cookie']
      @authorized_header = {'Cookie' => res['Set-Cookie'].split(';')[0], 'Content-Type' => 'application/octet-stream'}
    else
      $log.error "initalize request failed, code: #{res.code}, message: #{res.body}"
      raise Fluent::ConfigError, "initalize request failed, code: #{res.code}, message: #{res.body}"
    end
  rescue
    $log.error "failed to connect hoop server: #{@host} port #{@port}"
    raise
  end
  conn.finish
  $log.info "connected hoop server: #{@host} port #{@port}"
end

#write(chunk) ⇒ Object



329
330
331
332
333
334
335
336
337
338
# File 'lib/fluent/plugin/out_hoop.rb', line 329

def write(chunk)
  hdfs_path = path_format(chunk.key)
  begin
    send_data(hdfs_path, chunk.read)
  rescue
    $log.error "failed to communicate server, #{@host} port #{@port}, path: #{hdfs_path}"
    raise
  end
  hdfs_path
end