Class: Fluent::FluentLogEventRouter

Inherits:
NullFluentLogEventRouter show all
Defined in:
lib/fluent/fluent_log_event_router.rb

Overview

This class is for handling fluentd’s inner log e.g. <label @FLUNT_LOG> section and <match fluent.**> section

Constant Summary collapse

STOP =
:stop
GRACEFUL_STOP =
:graceful_stop

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from NullFluentLogEventRouter

#emittable?

Constructor Details

#initialize(event_router) ⇒ FluentLogEventRouter

Returns a new instance of FluentLogEventRouter.

Parameters:



85
86
87
88
89
90
# File 'lib/fluent/fluent_log_event_router.rb', line 85

def initialize(event_router)
  @event_router = event_router
  @thread = nil
  @graceful_stop = false
  @event_queue = Queue.new
end

Class Method Details

.build(root_agent) ⇒ Object

Parameters:



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/fluent/fluent_log_event_router.rb', line 39

def self.build(root_agent)
  log_event_router = nil

  begin
    log_event_agent = root_agent.find_label(Fluent::Log::LOG_EVENT_LABEL)
    log_event_router = log_event_agent.event_router

    # suppress mismatched tags only for <label @FLUENT_LOG> label.
    # it's not suppressed in default event router for non-log-event events
    log_event_router.suppress_missing_match!

    unmatched_tags = Fluent::Log.event_tags.select { |t| !log_event_router.match?(t) }
    unless unmatched_tags.empty?
      $log.warn "match for some tags of log events are not defined in @FLUENT_LOG label (to be ignored)", tags: unmatched_tags
    end

  rescue ArgumentError # ArgumentError "#{label_name} label not found"
    # use default event router if <label @FLUENT_LOG> is missing in configuration
    root_log_event_router = root_agent.event_router
    event_tags = Fluent::Log.event_tags
    if event_tags.any? { |t| root_log_event_router.match?(t) }
      log_event_router = root_log_event_router

      unmatched_tags = event_tags.select { |t| !log_event_router.match?(t) }
      if unmatched_tags.empty?
        $log.warn "define <match fluent.**> to capture fluentd logs in top level is deprecated. Use <label @FLUENT_LOG> instead"
      else
        matched_sections = (event_tags - unmatched_tags).map { |tag| "<match #{tag}>" }.join(', ')
        $log.warn "define #{matched_sections} to capture fluentd logs in top level is deprecated. Use <label @FLUENT_LOG> instead"
        $log.warn "match for some tags of log events are not defined in top level (to be ignored)", tags: unmatched_tags
      end
    end
  end

  if log_event_router
    FluentLogEventRouter.new(log_event_router)
  else
    $log.debug('No fluent logger for internal event')
    NullFluentLogEventRouter.new
  end
end

Instance Method Details

#emit_event(event) ⇒ Object



135
136
137
# File 'lib/fluent/fluent_log_event_router.rb', line 135

def emit_event(event)
  @event_queue.push(event)
end

#graceful_stopObject



129
130
131
132
133
# File 'lib/fluent/fluent_log_event_router.rb', line 129

def graceful_stop
  # to make sure to emit all log events into router, before shutting down
  @event_queue.push(GRACEFUL_STOP)
  @thread && @thread.join
end

#startObject



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/fluent/fluent_log_event_router.rb', line 92

def start
  @thread = Thread.new do
    $log.disable_events(Thread.current)

    loop do
      event = @event_queue.pop

      case event
      when GRACEFUL_STOP
        @graceful_stop = true
      when STOP
        break
      else
        begin
          tag, time, record = event
          @event_router.emit(tag, time, record)
        rescue => e
          # This $log.error doesn't emit log events, because of `$log.disable_events(Thread.current)` above
          $log.error "failed to emit fluentd's log event", tag: tag, event: record, error: e
        end
      end

      if @graceful_stop && @event_queue.empty?
        break
      end
    end
  end

  @thread.abort_on_exception = true
end

#stopObject



123
124
125
126
127
# File 'lib/fluent/fluent_log_event_router.rb', line 123

def stop
  @event_queue.push(STOP)
  # there is no problem calling Thread#join multiple times.
  @thread && @thread.join
end