Module: QueueMap::Consumer::ForkStrategy

Defined in:
lib/queue_map/consumer.rb

Defined Under Namespace

Classes: ImaChild

Instance Method Summary collapse

Instance Method Details

#startObject



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/queue_map/consumer.rb', line 136

def start
  @child_pids = []
  @master_pid = Process.pid
  before_fork_procs.each { |p| p.call }
  begin
    (count_workers - 1).times do |c|
      pid = Kernel.fork
      raise ImaChild if pid.nil?
      @child_pids << pid
    end
    $0 = "#{name} queue_map consumer: master"
  rescue ImaChild
    $0 = "#{name} queue_map consumer: child"
    # Parent Child Testing Product was a success! http://is.gd/fXmmZ
  end
  Signal.trap("TERM") { stop(false) }
  Signal.trap("INT") { stop }
  after_fork_procs.each { |p| p.call }
  logger.info "#{Process.pid}: running"
  run_consumer
end

#stop(graceful = true) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/queue_map/consumer.rb', line 158

def stop(graceful = true)
  $0 = $0 + " [shutting down]"
  @child_pids.each do |pid|
    begin
      Process.kill(graceful ? "INT" : "TERM", pid)
    rescue Errno::ESRCH => e
      logger.error "Unable to signal process #{pid}. Does the process not exist?"
    end
  end

  logger.info "#{Process.pid}: stopping (graceful: #{graceful})"
  if graceful
    Thread.new { sleep job_timeout; stop(false) } # after job_timeout seconds, force shut down
    @shutting_down = true
  else
    begin
      Kernel.exit 0
    ensure
      Kernel.exit! 0
    end
  end
end