Class: PerfectQueue::Multiprocess::ThreadProcessor
- Inherits:
-
Object
- Object
- PerfectQueue::Multiprocess::ThreadProcessor
- Defined in:
- lib/perfectqueue/multiprocess/thread_processor.rb
Direct Known Subclasses
Instance Method Summary collapse
- #force_stop ⇒ Object
-
#initialize(runner, processor_id, config) ⇒ ThreadProcessor
constructor
A new instance of ThreadProcessor.
- #join ⇒ Object
- #keepalive ⇒ Object
- #logrotated ⇒ Object
- #restart(immediate, config) ⇒ Object
- #run ⇒ Object
- #stop(immediate) ⇒ Object
Constructor Details
#initialize(runner, processor_id, config) ⇒ ThreadProcessor
Returns a new instance of ThreadProcessor.
23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 23 def initialize(runner, processor_id, config) @runner = runner @processor_id = processor_id @running_flag = BlockingFlag.new @finish_flag = BlockingFlag.new @tm = TaskMonitor.new(config, method(:child_heartbeat), method(:force_stop)) restart(false, config) end |
Instance Method Details
#force_stop ⇒ Object
78 79 80 81 82 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 78 def force_stop @log.error "Force stopping processor processor_id=#{@processor_id}" @tm.stop_task(true) @finish_flag.set! end |
#join ⇒ Object
47 48 49 50 51 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 47 def join while t = @thread t.join end end |
#keepalive ⇒ Object
53 54 55 56 57 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 53 def keepalive unless @thread @thread = Thread.new(&method(:run)) end end |
#logrotated ⇒ Object
84 85 86 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 84 def logrotated # do nothing end |
#restart(immediate, config) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 59 def restart(immediate, config) @poll_interval = config[:poll_interval] || 1.0 @log = config[:logger] @task_prefetch = config[:task_prefetch] || 0 @config = config @tm.stop_task(immediate) @finish_flag.set_region do @running_flag.wait while @running_flag.set? end end |
#run ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 35 def run @tm.start @running_flag.set_region do until @finish_flag.set? run_loop end end @tm.join ensure @thread = nil end |
#stop(immediate) ⇒ Object
72 73 74 75 76 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 72 def stop(immediate) @log.info immediate ? "Stopping thread immediately id=#{@processor_id}" : "Stopping thread gracefully id=#{@processor_id}" @tm.stop_task(immediate) @finish_flag.set! end |