Class: Fluent::UnitTimeFilterOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::UnitTimeFilterOutput
- Defined in:
- lib/fluent/plugin/unit_time_filter_buffer.rb,
lib/fluent/plugin/out_unit_time_filter.rb
Defined Under Namespace
Classes: Buffer
Constant Summary collapse
- BUFFER_KEY =
:unit_time_filter_buffer
Instance Method Summary collapse
Instance Method Details
#buffer ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/fluent/plugin/out_unit_time_filter.rb', line 39 def buffer buf = Thread.current[BUFFER_KEY] unless buf buf = Buffer.new({ :filter => @filter, :unit_sec => @unit_sec, :prefix => @prefix, :emit_each_tag => @emit_each_tag, :pass_hash_row => @pass_hash_row, :hash_row_time_key => @hash_row_time_key, :hash_row_tag_key => @hash_row_tag_key, }) Thread.current[BUFFER_KEY] = buf end return buf end |
#configure(conf) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/fluent/plugin/out_unit_time_filter.rb', line 16 def configure(conf) super unless File.exist?(@filter_path) raise Fluent::ConfigError, "No such file: #{@filter_path}" end begin @filter = Object.new.instance_eval(File.read(@filter_path)) rescue => e raise Fluent::ConfigError, "Invalid filter: #{@filter_path}: #{e}" end unless @filter.kind_of?(Proc) raise Fluent::ConfigError, "Invalid filter: #{@filter_path}: Filter must be Proc" end end |
#emit(tag, es, chain) ⇒ Object
59 60 61 62 |
# File 'lib/fluent/plugin/out_unit_time_filter.rb', line 59 def emit(tag, es, chain) buffer.resume(tag, es) chain.next end |
#shutdown ⇒ Object
34 35 36 37 |
# File 'lib/fluent/plugin/out_unit_time_filter.rb', line 34 def shutdown super Thread.current[BUFFER_KEY] = nil end |