Class: Fluent::BufferizeOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_bufferize.rb

Defined Under Namespace

Classes: PosKeeper

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBufferizeOutput

Returns a new instance of BufferizeOutput.



80
81
82
# File 'lib/fluent/plugin/out_bufferize.rb', line 80

def initialize
  super
end

Instance Attribute Details

#outputObject (readonly)

Returns the value of attribute output.



78
79
80
# File 'lib/fluent/plugin/out_bufferize.rb', line 78

def output
  @output
end

Instance Method Details

#configure(conf) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/fluent/plugin/out_bufferize.rb', line 84

def configure(conf)
  super

  configs = conf.elements.select{|e| e.name == 'config'}
  if configs.size != 1
    raise ConfigError, "Befferize: just one <config> directive is required"
  end

  type = configs.first['type']
  unless type
    raise ConfigError, "Befferize: 'type' parameter is required in <config> directive"
  end

  @output = Plugin.new_output(type)
  @output.configure(configs.first)
end

#format(tag, time, record) ⇒ Object



111
112
113
# File 'lib/fluent/plugin/out_bufferize.rb', line 111

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#shutdownObject



106
107
108
109
# File 'lib/fluent/plugin/out_bufferize.rb', line 106

def shutdown
  super
  @output.shutdown
end

#startObject



101
102
103
104
# File 'lib/fluent/plugin/out_bufferize.rb', line 101

def start
  super
  @output.start
end

#write(chunk) ⇒ Object



115
116
117
118
119
120
# File 'lib/fluent/plugin/out_bufferize.rb', line 115

def write(chunk)
  PosKeeper.get(chunk).each { |tag, time, record |
    @output.emit(tag, OneEventStream.new(time, record), NullOutputChain.instance)
  }
  PosKeeper.remove(chunk)
end