Class: Fluent::EventRouter

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/event_router.rb

Overview

EventRouter is responsible to route events to a collector.

It has a list of MatchPattern and Collector pairs:

+----------------+     +-----------------+
|  MatchPattern  |     |    Collector    |
+----------------+     +-----------------+
|   access.**  ---------> type forward   |
|     logs.**  ---------> type copy      |
|  archive.**  ---------> type s3        |
+----------------+     +-----------------+

EventRouter does:

1) receive an event at #emit methods 2) match the event’s tag with the MatchPatterns 3) forward the event to the corresponding Collector

Collector is either of Output, Filter or other EventRouter.

Defined Under Namespace

Classes: MatchCache, Pipeline, Rule

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(default_collector, emit_error_handler) ⇒ EventRouter

Returns a new instance of EventRouter.



44
45
46
47
48
49
# File 'lib/fluent/event_router.rb', line 44

def initialize(default_collector, emit_error_handler)
  @match_rules = []
  @match_cache = MatchCache.new
  @default_collector = default_collector
  @emit_error_handler = emit_error_handler
end

Instance Attribute Details

#default_collectorObject

Returns the value of attribute default_collector.



51
52
53
# File 'lib/fluent/event_router.rb', line 51

def default_collector
  @default_collector
end

#emit_error_handlerObject

Returns the value of attribute emit_error_handler.



52
53
54
# File 'lib/fluent/event_router.rb', line 52

def emit_error_handler
  @emit_error_handler
end

Instance Method Details

#add_rule(pattern, collector) ⇒ Object

called by Agent to add new match pattern and collector



81
82
83
# File 'lib/fluent/event_router.rb', line 81

def add_rule(pattern, collector)
  @match_rules << Rule.new(pattern, collector)
end

#emit(tag, time, record) ⇒ Object



85
86
87
88
89
# File 'lib/fluent/event_router.rb', line 85

def emit(tag, time, record)
  unless record.nil?
    emit_stream(tag, OneEventStream.new(time, record))
  end
end

#emit_array(tag, array) ⇒ Object



91
92
93
# File 'lib/fluent/event_router.rb', line 91

def emit_array(tag, array)
  emit_stream(tag, ArrayEventStream.new(array))
end

#emit_error_event(tag, time, record, error) ⇒ Object



101
102
103
# File 'lib/fluent/event_router.rb', line 101

def emit_error_event(tag, time, record, error)
  @emit_error_handler.emit_error_event(tag, time, record, error)
end

#emit_stream(tag, es) ⇒ Object



95
96
97
98
99
# File 'lib/fluent/event_router.rb', line 95

def emit_stream(tag, es)
  match(tag).emit_events(tag, es)
rescue => e
  @emit_error_handler.handle_emits_error(tag, es, e)
end

#match(tag) ⇒ Object



109
110
111
112
113
114
# File 'lib/fluent/event_router.rb', line 109

def match(tag)
  collector = @match_cache.get(tag) {
    find(tag) || @default_collector
  }
  collector
end

#match?(tag) ⇒ Boolean

Returns:

  • (Boolean)


105
106
107
# File 'lib/fluent/event_router.rb', line 105

def match?(tag)
  !!find(tag)
end

#suppress_missing_match!Object



74
75
76
77
78
# File 'lib/fluent/event_router.rb', line 74

def suppress_missing_match!
  if @default_collector.respond_to?(:suppress_missing_match!)
    @default_collector.suppress_missing_match!
  end
end