Class: FluentdServer::SyncWorker

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/fluentd_server/sync_worker.rb

Overview

Constant Summary collapse

DEFAULT_INTERVAL =
60

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logger

included, #logger, #logger=

Constructor Details

#initialize(opts = {}) ⇒ SyncWorker

Returns a new instance of SyncWorker.



17
18
19
20
21
# File 'lib/fluentd_server/sync_worker.rb', line 17

def initialize(opts = {})
  @opts = opts
  @interval = opts[:interval] || FluentdServer::Config.sync_interval || DEFAULT_INTERVAL
  @signals = []
end

Instance Attribute Details

#intervalObject (readonly)

Returns the value of attribute interval.



11
12
13
# File 'lib/fluentd_server/sync_worker.rb', line 11

def interval
  @interval
end

Class Method Details

.start(opts = {}) ⇒ Object



13
14
15
# File 'lib/fluentd_server/sync_worker.rb', line 13

def self.start(opts = {})
  self.new(opts).start
end

Instance Method Details

#startObject



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/fluentd_server/sync_worker.rb', line 28

def start
  Signal.trap(:INT){  @signals << :INT }
  Signal.trap(:HUP){  @signals << :HUP }
  Signal.trap(:TERM){ @signals << :TERM }
  Signal.trap(:PIPE, "IGNORE")

  update_next!
  logger.info("[sync] first updater start in #{@next_time}")

  childpid = nil
  while sleep(0.5) do
    if childpid
      begin
        if Process.waitpid(childpid, Process::WNOHANG)
          #TODO: $? (Process::Status object)
          logger.debug("[sync] update finished pid: #{childpid}, code: #{$? >> 8}")
          logger.debug("[sync] next updater start in #{@next_time}")
          childpid = nil
        end
      rescue Errno::ECHILD
        logger.warn("[sync] no child process");
        childpid = nil
      end
    end

    unless @signals.empty?
      logger.warn("[sync] signals_received: #{@signals.join(',')}")
      break
    end

    next if Time.now < @next_time
    update_next!
    logger.debug("[sync] (#{@next_time}) updater start")

    if childpid
      logger.warn("[sync] previous updater exists, skipping this time")
      next
    end

    childpid = fork do
      FluentdServer::SyncRunner.run(@opts)
    end
  end

  if childpid
    logger.warn("[sync] waiting for updater process finishing")
    begin
      waitpid childpid
    rescue Errno::ECHILD
      # ignore
    end
  end
end

#update_next!Object



23
24
25
26
# File 'lib/fluentd_server/sync_worker.rb', line 23

def update_next!
  now = Time.now
  @next_time = now - ( now.to_i % @interval ) + @interval
end