Module: Fluent::ServerModule

Defined in:
lib/fluent/supervisor.rb

Instance Method Summary collapse

Instance Method Details

#after_runObject



75
76
77
78
79
80
81
# File 'lib/fluent/supervisor.rb', line 75

def after_run
  stop_windows_event_thread if Fluent.windows?
  stop_rpc_server if @rpc_endpoint
  stop_counter_server if @counter
  cleanup_lock_dir
  Fluent::Supervisor.cleanup_resources
end

#before_runObject



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/fluent/supervisor.rb', line 41

def before_run
  @fluentd_conf = config[:fluentd_conf]
  @rpc_endpoint = nil
  @rpc_server = nil
  @counter = nil

  @fluentd_lock_dir = Dir.mktmpdir("fluentd-lock-")
  ENV['FLUENTD_LOCK_DIR'] = @fluentd_lock_dir

  if config[:rpc_endpoint]
    @rpc_endpoint = config[:rpc_endpoint]
    @enable_get_dump = config[:enable_get_dump]
    run_rpc_server
  end

  if Fluent.windows?
    install_windows_event_handler
  else
    install_supervisor_signal_handlers
  end

  if counter = config[:counter_server]
    run_counter_server(counter)
  end

  if config[:disable_shared_socket]
    $log.info "shared socket for multiple workers is disabled"
  else
    socket_manager_path = ServerEngine::SocketManager::Server.generate_path
    ServerEngine::SocketManager::Server.open(socket_manager_path)
    ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s
  end
end

#cleanup_lock_dirObject



83
84
85
86
# File 'lib/fluent/supervisor.rb', line 83

def cleanup_lock_dir
  FileUtils.rm(Dir.glob(File.join(@fluentd_lock_dir, "fluentd-*.lock")))
  FileUtils.rmdir(@fluentd_lock_dir)
end

#install_supervisor_signal_handlersObject



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/fluent/supervisor.rb', line 176

def install_supervisor_signal_handlers
  return if Fluent.windows?

  trap :HUP do
    $log.debug "fluentd supervisor process get SIGHUP"
    supervisor_sighup_handler
  end

  trap :USR1 do
    $log.debug "fluentd supervisor process get SIGUSR1"
    supervisor_sigusr1_handler
  end

  trap :USR2 do
    $log.debug 'fluentd supervisor process got SIGUSR2'
    supervisor_sigusr2_handler
  end
end

#install_windows_event_handlerObject



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/fluent/supervisor.rb', line 213

def install_windows_event_handler
  return unless Fluent.windows?

  @pid_signame = "fluentd_#{$$}"
  @signame = config[:signame]

  Thread.new do
    ipc = Win32::Ipc.new(nil)
    events = [
      {win32_event: Win32::Event.new("#{@pid_signame}_STOP_EVENT_THREAD"), action: :stop_event_thread},
      {win32_event: Win32::Event.new("#{@pid_signame}"), action: :stop},
      {win32_event: Win32::Event.new("#{@pid_signame}_HUP"), action: :hup},
      {win32_event: Win32::Event.new("#{@pid_signame}_USR1"), action: :usr1},
      {win32_event: Win32::Event.new("#{@pid_signame}_USR2"), action: :usr2},
      {win32_event: Win32::Event.new("#{@pid_signame}_CONT"), action: :cont},
    ]
    if @signame
      signame_events = [
        {win32_event: Win32::Event.new("#{@signame}"), action: :stop},
        {win32_event: Win32::Event.new("#{@signame}_HUP"), action: :hup},
        {win32_event: Win32::Event.new("#{@signame}_USR1"), action: :usr1},
        {win32_event: Win32::Event.new("#{@signame}_USR2"), action: :usr2},
        {win32_event: Win32::Event.new("#{@signame}_CONT"), action: :cont},
      ]
      events.concat(signame_events)
    end
    begin
      loop do
        infinite = 0xFFFFFFFF
        ipc_idx = ipc.wait_any(events.map {|e| e[:win32_event]}, infinite)
        event_idx = ipc_idx - 1

        if event_idx >= 0 && event_idx < events.length
          $log.debug("Got Win32 event \"#{events[event_idx][:win32_event].name}\"")
        else
          $log.warn("Unexpected return value of Win32::Ipc#wait_any: #{ipc_idx}")
        end
        case events[event_idx][:action]
        when :stop
          stop(true)
        when :hup
          supervisor_sighup_handler
        when :usr1
          supervisor_sigusr1_handler
        when :usr2
          supervisor_sigusr2_handler
        when :cont
          supervisor_dump_handler_for_windows
        when :stop_event_thread
          break
        end
      end
    ensure
      events.each { |event| event[:win32_event].close }
    end
  end
end

#kill_workerObject



336
337
338
339
340
341
342
343
344
345
346
347
348
# File 'lib/fluent/supervisor.rb', line 336

def kill_worker
  if config[:worker_pid]
    pids = config[:worker_pid].clone
    config[:worker_pid].clear
    pids.each_value do |pid|
      if Fluent.windows?
        Process.kill :KILL, pid
      else
        Process.kill :TERM, pid
      end
    end
  end
end

#reloadObject



206
207
208
209
210
# File 'lib/fluent/supervisor.rb', line 206

def reload
  @monitors.each do |m|
    m.send_command("RELOAD\n")
  end
end

#restart(graceful) ⇒ Object

Override some methods of ServerEngine::MultiSpawnWorker Since Fluentd’s Supervisor doesn’t use ServerEngine’s HUP, USR1 and USR2 handlers (see install_supervisor_signal_handlers), they should be disabled also on Windows, just send commands to workers instead.



200
201
202
203
204
# File 'lib/fluent/supervisor.rb', line 200

def restart(graceful)
  @monitors.each do |m|
    m.send_command(graceful ? "GRACEFUL_RESTART\n" : "IMMEDIATE_RESTART\n")
  end
end

#run_counter_server(counter_conf) ⇒ Object



164
165
166
167
168
169
170
# File 'lib/fluent/supervisor.rb', line 164

def run_counter_server(counter_conf)
  @counter = Fluent::Counter::Server.new(
    counter_conf.scope,
    {host: counter_conf.bind, port: counter_conf.port, log: $log, path: counter_conf.backup_path}
  )
  @counter.start
end

#run_rpc_serverObject



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/fluent/supervisor.rb', line 88

def run_rpc_server
  @rpc_server = RPC::Server.new(@rpc_endpoint, $log)

  # built-in RPC for signals
  @rpc_server.mount_proc('/api/processes.interruptWorkers') { |req, res|
    $log.debug "fluentd RPC got /api/processes.interruptWorkers request"
    Process.kill :INT, $$
    nil
  }
  @rpc_server.mount_proc('/api/processes.killWorkers') { |req, res|
    $log.debug "fluentd RPC got /api/processes.killWorkers request"
    Process.kill :TERM, $$
    nil
  }
  @rpc_server.mount_proc('/api/processes.flushBuffersAndKillWorkers') { |req, res|
    $log.debug "fluentd RPC got /api/processes.flushBuffersAndKillWorkers request"
    if Fluent.windows?
      supervisor_sigusr1_handler
      stop(true)
    else
      Process.kill :USR1, $$
      Process.kill :TERM, $$
    end
    nil
  }
  @rpc_server.mount_proc('/api/plugins.flushBuffers') { |req, res|
    $log.debug "fluentd RPC got /api/plugins.flushBuffers request"
    if Fluent.windows?
      supervisor_sigusr1_handler
    else
      Process.kill :USR1, $$
    end
    nil
  }
  @rpc_server.mount_proc('/api/config.reload') { |req, res|
    $log.debug "fluentd RPC got /api/config.reload request"
    if Fluent.windows?
      # restart worker with auto restarting by killing
      kill_worker
    else
      Process.kill :HUP, $$
    end
    nil
  }
  @rpc_server.mount_proc('/api/config.dump') { |req, res|
    $log.debug "fluentd RPC got /api/config.dump request"
    $log.info "dump in-memory config"
    supervisor_dump_config_handler
    nil
  }

  @rpc_server.mount_proc('/api/config.gracefulReload') { |req, res|
    $log.debug "fluentd RPC got /api/config.gracefulReload request"
    if Fluent.windows?
      supervisor_sigusr2_handler
    else
      Process.kill :USR2, $$
    end

    nil
  }

  @rpc_server.mount_proc('/api/config.getDump') { |req, res|
    $log.debug "fluentd RPC got /api/config.getDump request"
    $log.info "get dump in-memory config via HTTP"
    res.body = supervisor_get_dump_config_handler
    [nil, nil, res]
  } if @enable_get_dump

  @rpc_server.start
end

#stop_counter_serverObject



172
173
174
# File 'lib/fluent/supervisor.rb', line 172

def stop_counter_server
  @counter.stop
end

#stop_rpc_serverObject



160
161
162
# File 'lib/fluent/supervisor.rb', line 160

def stop_rpc_server
  @rpc_server.shutdown
end

#stop_windows_event_threadObject



271
272
273
274
275
276
277
# File 'lib/fluent/supervisor.rb', line 271

def stop_windows_event_thread
  if Fluent.windows?
    ev = Win32::Event.open("#{@pid_signame}_STOP_EVENT_THREAD")
    ev.set
    ev.close
  end
end

#supervisor_dump_config_handlerObject



350
351
352
# File 'lib/fluent/supervisor.rb', line 350

def supervisor_dump_config_handler
  $log.info @fluentd_conf
end

#supervisor_dump_handler_for_windowsObject



316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/fluent/supervisor.rb', line 316

def supervisor_dump_handler_for_windows
  # As for UNIX-like, SIGCONT signal to each process makes the process output its dump-file,
  # and it is implemented before the implementation of the function for Windows.
  # It is possible to trap SIGCONT and handle it here also on UNIX-like,
  # but for backward compatibility, this handler is currently for a Windows-only.
  raise "[BUG] This function is for Windows ONLY." unless Fluent.windows?

  Thread.new do
    begin
      FluentSigdump.dump_windows
    rescue => e
      $log.error "failed to dump: #{e}"
    end
  end

  send_signal_to_workers(:CONT)
rescue => e
  $log.error "failed to dump: #{e}"
end

#supervisor_get_dump_config_handlerObject



354
355
356
# File 'lib/fluent/supervisor.rb', line 354

def supervisor_get_dump_config_handler
  { conf: @fluentd_conf }
end

#supervisor_sighup_handlerObject



279
280
281
# File 'lib/fluent/supervisor.rb', line 279

def supervisor_sighup_handler
  kill_worker
end

#supervisor_sigusr1_handlerObject



283
284
285
286
# File 'lib/fluent/supervisor.rb', line 283

def supervisor_sigusr1_handler
  reopen_log
  send_signal_to_workers(:USR1)
end

#supervisor_sigusr2_handlerObject



288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/fluent/supervisor.rb', line 288

def supervisor_sigusr2_handler
  conf = nil
  t = Thread.new do
    $log.info 'Reloading new config'

    # Validate that loading config is valid at first
    conf = Fluent::Config.build(
      config_path: config[:config_path],
      encoding: config[:conf_encoding],
      additional_config: config[:inline_config],
      use_v1_config: config[:use_v1_config],
    )

    Fluent::VariableStore.try_to_reset do
      Fluent::Engine.reload_config(conf, supervisor: true)
    end
  end

  t.report_on_exception = false # Error is handled by myself
  t.join

  reopen_log
  send_signal_to_workers(:USR2)
  @fluentd_conf = conf.to_s
rescue => e
  $log.error "Failed to reload config file: #{e}"
end