Class: PerfectQueue::Multiprocess::ThreadProcessor
- Inherits:
-
Object
- Object
- PerfectQueue::Multiprocess::ThreadProcessor
- Defined in:
- lib/perfectqueue/multiprocess/thread_processor.rb
Direct Known Subclasses
Instance Method Summary collapse
-
#initialize(runner, config) ⇒ ThreadProcessor
constructor
A new instance of ThreadProcessor.
- #join ⇒ Object
- #keepalive ⇒ Object
- #logrotated ⇒ Object
- #restart(immediate, config) ⇒ Object
- #run ⇒ Object
- #stop(immediate) ⇒ Object
Constructor Details
#initialize(runner, config) ⇒ ThreadProcessor
23 24 25 26 27 28 29 30 31 32 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 23 def initialize(runner, config) @runner = runner @running_flag = BlockingFlag.new @finish_flag = BlockingFlag.new @tm = TaskMonitor.new(config, method(:child_heartbeat)) restart(false, config) end |
Instance Method Details
#join ⇒ Object
46 47 48 49 50 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 46 def join while t = @thread t.join end end |
#keepalive ⇒ Object
52 53 54 55 56 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 52 def keepalive unless @thread @thread = Thread.new(&method(:run)) end end |
#logrotated ⇒ Object
76 77 78 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 76 def logrotated # do nothing end |
#restart(immediate, config) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 58 def restart(immediate, config) @poll_interval = config[:poll_interval] || 1.0 @log = config[:logger] @config = config @tm.stop_task(immediate) @finish_flag.set_region do @running_flag.wait while @running_flag.set? end end |
#run ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 34 def run @tm.start @running_flag.set_region do until @finish_flag.set? run_loop end end @tm.join ensure @thread = nil end |
#stop(immediate) ⇒ Object
70 71 72 73 74 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 70 def stop(immediate) @log.info immediate ? "Stopping worker thread immediately" : "Stopping worker thread gracefully" @tm.stop_task(immediate) @finish_flag.set! end |