Class: Fluent::Plugin::RouteOutput
- Inherits:
-
BareOutput
- Object
- BareOutput
- Fluent::Plugin::RouteOutput
- Defined in:
- lib/fluent/plugin/out_route.rb
Defined Under Namespace
Classes: Route
Instance Attribute Summary collapse
-
#routes ⇒ Object
readonly
Returns the value of attribute routes.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #process(tag, es) ⇒ Object
- #tag_modifier(remove_tag_prefix, add_tag_prefix) ⇒ Object
Instance Attribute Details
#routes ⇒ Object (readonly)
Returns the value of attribute routes.
41 42 43 |
# File 'lib/fluent/plugin/out_route.rb', line 41 def routes @routes end |
Instance Method Details
#configure(conf) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/fluent/plugin/out_route.rb', line 66 def configure(conf) if conf.elements(name: 'store').size > 0 raise Fluent::ConfigError, "<store> section is not available in route plugin" end super @match_cache = {} @routes = [] @route_configs.each do |rc| route_router = event_emitter_router(rc['@label']) modifier = tag_modifier(rc.remove_tag_prefix, rc.add_tag_prefix) @routes << Route.new(rc.pattern, route_router, modifier, rc.copy) end @default_tag_modifier = (@remove_tag_prefix || @add_tag_prefix) ? tag_modifier(@remove_tag_prefix, @add_tag_prefix) : nil @mutex = Mutex.new end |
#process(tag, es) ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/fluent/plugin/out_route.rb', line 106 def process(tag, es) modified_tag, targets = @match_cache[tag] unless targets modified_tag = @default_tag_modifier ? @default_tag_modifier.call(tag) : tag targets = [] @routes.each do |r| if r.match?(modified_tag) targets << r break unless r.copy? end end @mutex.synchronize do if @match_cache.size >= @match_cache_size remove_keys = @match_cache.keys[0...(@match_cache_size / 2)] @match_cache.delete_if{|key, _value| remove_keys.include?(key) } end @match_cache[tag] = [modified_tag, targets] end end case targets.size when 0 # do nothing when 1 targets.first.emit(modified_tag, es) else targets.each do |target| dup_es = if es.respond_to?(:dup) es.dup else m_es = MultiEventStream.new es.each{|t,r| m_es.add(t, r) } m_es end target.emit(modified_tag, dup_es) end end end |
#tag_modifier(remove_tag_prefix, add_tag_prefix) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/fluent/plugin/out_route.rb', line 43 def tag_modifier(remove_tag_prefix, add_tag_prefix) tag_cache_size = @tag_cache_size cache = {} mutex = Mutex.new removed_prefix = remove_tag_prefix ? remove_tag_prefix + "." : "" added_prefix = add_tag_prefix ? add_tag_prefix + "." : "" ->(tag){ if cached = cache[tag] cached else modified = tag.start_with?(removed_prefix) ? tag.sub(removed_prefix, added_prefix) : added_prefix + tag mutex.synchronize do if cache.size >= tag_cache_size remove_keys = cache.keys[0...(tag_cache_size / 2)] cache.delete_if{|key, _value| remove_keys.include?(key) } end cache[tag] = modified end modified end } end |