Class: Fluent::EventRouter

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/event_router.rb

Overview

EventRouter is responsible to route events to a collector.

It has a list of MatchPattern and Collector pairs:

+----------------+     +-----------------+
|  MatchPattern  |     |    Collector    |
+----------------+     +-----------------+
|   access.**  ---------> type forward   |
|     logs.**  ---------> type copy      |
|  archive.**  ---------> type s3        |
+----------------+     +-----------------+

EventRouter does:

1) receive an event at #emit methods 2) match the event’s tag with the MatchPatterns 3) forward the event to the corresponding Collector

Collector is either of Output, Filter or other EventRouter.

Defined Under Namespace

Classes: MatchCache, Pipeline, Rule

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(default_collector, emit_error_handler) ⇒ EventRouter

Returns a new instance of EventRouter.



44
45
46
47
48
49
# File 'lib/fluent/event_router.rb', line 44

def initialize(default_collector, emit_error_handler)
  @match_rules = []
  @match_cache = MatchCache.new
  @default_collector = default_collector
  @emit_error_handler = emit_error_handler
end

Instance Attribute Details

#default_collectorObject

Returns the value of attribute default_collector.



51
52
53
# File 'lib/fluent/event_router.rb', line 51

def default_collector
  @default_collector
end

#emit_error_handlerObject

Returns the value of attribute emit_error_handler.



52
53
54
# File 'lib/fluent/event_router.rb', line 52

def emit_error_handler
  @emit_error_handler
end

Instance Method Details

#add_rule(pattern, collector) ⇒ Object

called by Agent to add new match pattern and collector



75
76
77
# File 'lib/fluent/event_router.rb', line 75

def add_rule(pattern, collector)
  @match_rules << Rule.new(pattern, collector)
end

#emit(tag, time, record) ⇒ Object



79
80
81
82
83
# File 'lib/fluent/event_router.rb', line 79

def emit(tag, time, record)
  unless record.nil?
    emit_stream(tag, OneEventStream.new(time, record))
  end
end

#emit_array(tag, array) ⇒ Object



85
86
87
# File 'lib/fluent/event_router.rb', line 85

def emit_array(tag, array)
  emit_stream(tag, ArrayEventStream.new(array))
end

#emit_error_event(tag, time, record, error) ⇒ Object



95
96
97
# File 'lib/fluent/event_router.rb', line 95

def emit_error_event(tag, time, record, error)
  @emit_error_handler.emit_error_event(tag, time, record, error)
end

#emit_stream(tag, es) ⇒ Object



89
90
91
92
93
# File 'lib/fluent/event_router.rb', line 89

def emit_stream(tag, es)
  match(tag).emit_events(tag, es)
rescue => e
  @emit_error_handler.handle_emits_error(tag, es, e)
end

#match(tag) ⇒ Object



103
104
105
106
107
108
# File 'lib/fluent/event_router.rb', line 103

def match(tag)
  collector = @match_cache.get(tag) {
    find(tag) || @default_collector
  }
  collector
end

#match?(tag) ⇒ Boolean

Returns:

  • (Boolean)


99
100
101
# File 'lib/fluent/event_router.rb', line 99

def match?(tag)
  !!find(tag)
end