Class: CI::Queue::Redis::Base::HeartbeatProcess
- Inherits:
-
Object
- Object
- CI::Queue::Redis::Base::HeartbeatProcess
- Defined in:
- lib/ci/queue/redis/base.rb
Instance Method Summary collapse
- #boot! ⇒ Object
-
#initialize(redis_url, zset_key, processed_key, owners_key, worker_queue_key, entry_delimiter:) ⇒ HeartbeatProcess
constructor
A new instance of HeartbeatProcess.
- #shutdown! ⇒ Object
- #tick!(id) ⇒ Object
Constructor Details
#initialize(redis_url, zset_key, processed_key, owners_key, worker_queue_key, entry_delimiter:) ⇒ HeartbeatProcess
Returns a new instance of HeartbeatProcess.
267 268 269 270 271 272 273 274 |
# File 'lib/ci/queue/redis/base.rb', line 267 def initialize(redis_url, zset_key, processed_key, owners_key, worker_queue_key, entry_delimiter:) @redis_url = redis_url @zset_key = zset_key @processed_key = processed_key @owners_key = owners_key @worker_queue_key = worker_queue_key @entry_delimiter = entry_delimiter end |
Instance Method Details
#boot! ⇒ Object
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 |
# File 'lib/ci/queue/redis/base.rb', line 276 def boot! child_read, @pipe = IO.pipe ready_pipe, child_write = IO.pipe @pipe.binmode @pid = Process.spawn( RbConfig.ruby, ::File.join(__dir__, "monitor.rb"), @redis_url, @zset_key, @processed_key, @owners_key, @worker_queue_key, @entry_delimiter, in: child_read, out: child_write, ) child_read.close child_write.close # Check the process is alive. if ready_pipe.wait_readable(10) ready_pipe.gets ready_pipe.close Process.kill(0, @pid) else Process.kill(0, @pid) Process.wait(@pid) raise "Monitor child wasn't ready after 10 seconds" end @pipe end |
#shutdown! ⇒ Object
308 309 310 311 312 313 314 315 316 |
# File 'lib/ci/queue/redis/base.rb', line 308 def shutdown! @pipe.close begin _, status = Process.waitpid2(@pid) status rescue Errno::ECHILD nil end end |
#tick!(id) ⇒ Object
318 319 320 |
# File 'lib/ci/queue/redis/base.rb', line 318 def tick!(id) (:tick!, id: id) end |