Class: Fluent::GStoreOutput

Inherits:
TimeSlicedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_gstore.rb

Instance Method Summary collapse

Constructor Details

#initializeGStoreOutput

Returns a new instance of GStoreOutput.



7
8
9
10
11
12
13
14
15
16
# File 'lib/fluent/plugin/out_gstore.rb', line 7

def initialize
  super
  require 'gstore'
  require 'kconv'
  require 'zlib'
  require 'time'
  require 'tempfile'
  require "pathname"
  require "rexml/document"
end

Instance Method Details

#configure(conf) ⇒ Object



25
26
27
28
# File 'lib/fluent/plugin/out_gstore.rb', line 25

def configure(conf)
  super
  @timef = TimeFormatter.new(@time_format, @localtime)
end

#exists?(source) ⇒ Boolean

Returns:

  • (Boolean)


44
45
46
47
48
49
50
51
52
53
54
# File 'lib/fluent/plugin/out_gstore.rb', line 44

def exists?(source)
  begin
    REXML::Document.new source
    # :TODO
    false
  rescue REXML::ParseException => e
    true
  rescue Exception => e
    true
  end
end

#format(tag, time, record) ⇒ Object



39
40
41
42
# File 'lib/fluent/plugin/out_gstore.rb', line 39

def format(tag, time, record)
  time_str = @timef.format(time)
    "#{time_str}\t#{tag}\t#{record.to_json}\n"
end

#startObject



30
31
32
33
34
35
36
37
# File 'lib/fluent/plugin/out_gstore.rb', line 30

def start
  super
  options = {
    :access_key => @gstore_key_id,
    :secret_key => @gstore_sec_key
  }
  @gstore = GStore::Client.new(options)
end

#write(chunk) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/fluent/plugin/out_gstore.rb', line 56

def write(chunk)
  i = 0
  begin
    gstorepath = "#{@path}#{chunk.key}_#{i}.gz"
    i += 1
  end while exists? @gstore.get_object(@gstore_bucket, gstorepath)

  tmp = Tempfile.new("gstore-")
  w = Zlib::GzipWriter.new(tmp)
  begin
    chunk.write_to(w)
    w.close
    @gstore.put_object(
      @gstore_bucket,
      gstorepath,
      :data => Pathname.new(tmp.path).read())
  ensure
    w.close rescue nil
  end
end