Class: Fluent::Plugin::EventCollectorFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_event_collector.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



35
36
37
38
39
40
# File 'lib/fluent/plugin/filter_event_collector.rb', line 35

def configure(conf)
  super

  @event_buffer = {}
  @event_buffer_lock = Mutex.new
end

#filter(tag, time, record) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/fluent/plugin/filter_event_collector.rb', line 49

def filter(tag, time, record)
  # Pass through if we can't associate a record with a event
  return record unless record[event_key]
  
  @event_buffer_lock.synchronize do
    if record[end_tag_key] == end_tag_value
      # Complete event object and publish to rest of fluent chain
      publish_event(record)
    else
      # Merge in fluent's tag and time in case we need to manually emit later
      update_event(record.merge({'fd_tag' => tag, 'fd_time' => time}))

      # nil return halts the fluent event chain for this event
      nil
    end
  end
end

#startObject



42
43
44
45
46
47
# File 'lib/fluent/plugin/filter_event_collector.rb', line 42

def start
  super

  # Set up event timeout
  timer_execute(:event_timeout, event_timeout, repeat: false, &method(:timeout_flush))
end