Class: Fluent::BufferizeOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_bufferize.rb

Defined Under Namespace

Classes: PosKeeper

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBufferizeOutput

Returns a new instance of BufferizeOutput.



78
79
80
# File 'lib/fluent/plugin/out_bufferize.rb', line 78

def initialize
  super
end

Instance Attribute Details

#outputObject (readonly)

Returns the value of attribute output.



76
77
78
# File 'lib/fluent/plugin/out_bufferize.rb', line 76

def output
  @output
end

Instance Method Details

#configure(conf) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/fluent/plugin/out_bufferize.rb', line 82

def configure(conf)
  super

  configs = conf.elements.select{|e| e.name == 'config'}
  if configs.size != 1
    raise ConfigError, "Befferize: just one <config> directive is required"
  end

  type = configs.first['type']
  unless type
    raise ConfigError, "Befferize: 'type' parameter is required in <config> directive"
  end

  @output = Plugin.new_output(type)
  @output.configure(configs.first)
end

#format(tag, time, record) ⇒ Object



109
110
111
# File 'lib/fluent/plugin/out_bufferize.rb', line 109

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#shutdownObject



104
105
106
107
# File 'lib/fluent/plugin/out_bufferize.rb', line 104

def shutdown
  super
  @output.shutdown
end

#startObject



99
100
101
102
# File 'lib/fluent/plugin/out_bufferize.rb', line 99

def start
  super
  @output.start
end

#write(chunk) ⇒ Object



113
114
115
116
117
118
# File 'lib/fluent/plugin/out_bufferize.rb', line 113

def write(chunk)
  PosKeeper.get(chunk).each { |tag, time, record |
    @output.emit(tag, OneEventStream.new(time, record), NullOutputChain.instance)
  }
  PosKeeper.remove(chunk)
end