Class: CI::Queue::Redis::Base::HeartbeatProcess

Inherits:
Object
  • Object
show all
Defined in:
lib/ci/queue/redis/base.rb

Instance Method Summary collapse

Constructor Details

#initialize(redis_url, zset_key, processed_key, owners_key, worker_queue_key, entry_delimiter:) ⇒ HeartbeatProcess

Returns a new instance of HeartbeatProcess.



267
268
269
270
271
272
273
274
# File 'lib/ci/queue/redis/base.rb', line 267

def initialize(redis_url, zset_key, processed_key, owners_key, worker_queue_key, entry_delimiter:)
  @redis_url = redis_url
  @zset_key = zset_key
  @processed_key = processed_key
  @owners_key = owners_key
  @worker_queue_key = worker_queue_key
  @entry_delimiter = entry_delimiter
end

Instance Method Details

#boot!Object



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# File 'lib/ci/queue/redis/base.rb', line 276

def boot!
  child_read, @pipe = IO.pipe
  ready_pipe, child_write = IO.pipe
  @pipe.binmode
  @pid = Process.spawn(
    RbConfig.ruby,
    ::File.join(__dir__, "monitor.rb"),
    @redis_url,
    @zset_key,
    @processed_key,
    @owners_key,
    @worker_queue_key,
    @entry_delimiter,
    in: child_read,
    out: child_write,
  )
  child_read.close
  child_write.close

  # Check the process is alive.
  if ready_pipe.wait_readable(10)
    ready_pipe.gets
    ready_pipe.close
    Process.kill(0, @pid)
  else
    Process.kill(0, @pid)
    Process.wait(@pid)
    raise "Monitor child wasn't ready after 10 seconds"
  end
  @pipe
end

#shutdown!Object



308
309
310
311
312
313
314
315
316
# File 'lib/ci/queue/redis/base.rb', line 308

def shutdown!
  @pipe.close
  begin
    _, status = Process.waitpid2(@pid)
    status
  rescue Errno::ECHILD
    nil
  end
end

#tick!(id) ⇒ Object



318
319
320
# File 'lib/ci/queue/redis/base.rb', line 318

def tick!(id)
  send_message(:tick!, id: id)
end