Class: CI::Queue::Redis::Base::HeartbeatProcess
- Inherits:
-
Object
- Object
- CI::Queue::Redis::Base::HeartbeatProcess
- Defined in:
- lib/ci/queue/redis/base.rb
Instance Method Summary collapse
- #boot! ⇒ Object
-
#initialize(redis_url, zset_key, owners_key, leases_key) ⇒ HeartbeatProcess
constructor
A new instance of HeartbeatProcess.
- #shutdown! ⇒ Object
- #tick!(id, lease) ⇒ Object
Constructor Details
#initialize(redis_url, zset_key, owners_key, leases_key) ⇒ HeartbeatProcess
Returns a new instance of HeartbeatProcess.
267 268 269 270 271 272 |
# File 'lib/ci/queue/redis/base.rb', line 267 def initialize(redis_url, zset_key, owners_key, leases_key) @redis_url = redis_url @zset_key = zset_key @owners_key = owners_key @leases_key = leases_key end |
Instance Method Details
#boot! ⇒ Object
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 |
# File 'lib/ci/queue/redis/base.rb', line 274 def boot! child_read, @pipe = IO.pipe ready_pipe, child_write = IO.pipe @pipe.binmode @pid = Process.spawn( RbConfig.ruby, ::File.join(__dir__, "monitor.rb"), @redis_url, @zset_key, @owners_key, @leases_key, in: child_read, out: child_write, ) child_read.close child_write.close # Check the process is alive. if ready_pipe.wait_readable(10) ready_pipe.gets ready_pipe.close Process.kill(0, @pid) else Process.kill(0, @pid) Process.wait(@pid) raise "Monitor child wasn't ready after 10 seconds" end @pipe end |
#shutdown! ⇒ Object
304 305 306 307 308 309 310 311 312 |
# File 'lib/ci/queue/redis/base.rb', line 304 def shutdown! @pipe.close begin _, status = Process.waitpid2(@pid) status rescue Errno::ECHILD nil end end |
#tick!(id, lease) ⇒ Object
314 315 316 |
# File 'lib/ci/queue/redis/base.rb', line 314 def tick!(id, lease) (:tick!, id: id, lease: lease.to_s) end |