Class: PerfectQueue::Multiprocess::ThreadProcessor
- Inherits:
-
Object
- Object
- PerfectQueue::Multiprocess::ThreadProcessor
- Defined in:
- lib/perfectqueue/multiprocess/thread_processor.rb
Direct Known Subclasses
Instance Method Summary collapse
- #force_stop ⇒ Object
-
#initialize(runner, processor_id, config) ⇒ ThreadProcessor
constructor
A new instance of ThreadProcessor.
- #join ⇒ Object
- #keepalive ⇒ Object
- #logrotated ⇒ Object
- #restart(immediate, config) ⇒ Object
- #run ⇒ Object
- #stop(immediate) ⇒ Object
Constructor Details
#initialize(runner, processor_id, config) ⇒ ThreadProcessor
Returns a new instance of ThreadProcessor.
23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 23 def initialize(runner, processor_id, config) @runner = runner @processor_id = processor_id @running_flag = BlockingFlag.new @finish_flag = BlockingFlag.new @tm = TaskMonitor.new(config, method(:child_heartbeat), method(:force_stop)) restart(false, config) end |
Instance Method Details
#force_stop ⇒ Object
76 77 78 79 80 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 76 def force_stop @log.error "Force stopping processor processor_id=#{@processor_id}" @tm.stop_task(true) @finish_flag.set! end |
#join ⇒ Object
45 46 47 48 49 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 45 def join while t = @thread t.join end end |
#keepalive ⇒ Object
51 52 53 54 55 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 51 def keepalive unless @thread @thread = Thread.new(&method(:run)) end end |
#logrotated ⇒ Object
82 83 84 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 82 def logrotated # do nothing end |
#restart(immediate, config) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 57 def restart(immediate, config) @poll_interval = config[:poll_interval] || 1.0 @log = config[:logger] @task_prefetch = config[:task_prefetch] || 0 @config = config @tm.stop_task(immediate) @finish_flag.set_region do @running_flag.wait while @running_flag.set? end end |
#run ⇒ Object
35 36 37 38 39 40 41 42 43 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 35 def run @tm.start @running_flag.set_region do run_loop end @tm.join ensure @thread = nil end |
#stop(immediate) ⇒ Object
70 71 72 73 74 |
# File 'lib/perfectqueue/multiprocess/thread_processor.rb', line 70 def stop(immediate) @log.info immediate ? "Stopping thread immediately id=#{@processor_id}" : "Stopping thread gracefully id=#{@processor_id}" @tm.stop_task(immediate) @finish_flag.set! end |