Class: CI::Queue::Redis::Base::HeartbeatProcess

Inherits:
Object
  • Object
show all
Defined in:
lib/ci/queue/redis/base.rb

Instance Method Summary collapse

Constructor Details

#initialize(redis_url, zset_key, owners_key, leases_key) ⇒ HeartbeatProcess

Returns a new instance of HeartbeatProcess.



267
268
269
270
271
272
# File 'lib/ci/queue/redis/base.rb', line 267

def initialize(redis_url, zset_key, owners_key, leases_key)
  @redis_url = redis_url
  @zset_key = zset_key
  @owners_key = owners_key
  @leases_key = leases_key
end

Instance Method Details

#boot!Object



274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/ci/queue/redis/base.rb', line 274

def boot!
  child_read, @pipe = IO.pipe
  ready_pipe, child_write = IO.pipe
  @pipe.binmode
  @pid = Process.spawn(
    RbConfig.ruby,
    ::File.join(__dir__, "monitor.rb"),
    @redis_url,
    @zset_key,
    @owners_key,
    @leases_key,
    in: child_read,
    out: child_write,
  )
  child_read.close
  child_write.close

  # Check the process is alive.
  if ready_pipe.wait_readable(10)
    ready_pipe.gets
    ready_pipe.close
    Process.kill(0, @pid)
  else
    Process.kill(0, @pid)
    Process.wait(@pid)
    raise "Monitor child wasn't ready after 10 seconds"
  end
  @pipe
end

#shutdown!Object



304
305
306
307
308
309
310
311
312
# File 'lib/ci/queue/redis/base.rb', line 304

def shutdown!
  @pipe.close
  begin
    _, status = Process.waitpid2(@pid)
    status
  rescue Errno::ECHILD
    nil
  end
end

#tick!(id, lease) ⇒ Object



314
315
316
# File 'lib/ci/queue/redis/base.rb', line 314

def tick!(id, lease)
  send_message(:tick!, id: id, lease: lease.to_s)
end