Class: Isono::ThreadPool
- Inherits:
-
Object
- Object
- Isono::ThreadPool
- Includes:
- Logger
- Defined in:
- lib/isono/thread_pool.rb
Defined Under Namespace
Classes: TimeoutError, WorkerTerminateError
Instance Method Summary collapse
-
#barrier(immediate = true, time_out = nil, &blk) ⇒ Object
Send a block to a worker thread similar with pass().
- #clear ⇒ Object
- #graceful_shutdown2 ⇒ Object
-
#initialize(worker_num = 1, name = nil, opts = {}) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #member_thread?(thread = Thread.current) ⇒ Boolean
-
#pass(immediate = true, &blk) ⇒ Object
Pass a block to a worker thread.
-
#shutdown ⇒ Object
Immediatly shutdown all the worker threads.
- #shutdown_graceful(timeout) ⇒ Object
Methods included from Logger
Constructor Details
#initialize(worker_num = 1, name = nil, opts = {}) ⇒ ThreadPool
Returns a new instance of ThreadPool.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/isono/thread_pool.rb', line 11 def initialize(worker_num=1, name=nil, opts={}) set_instance_logger(name) @queue = ::Queue.new @name = name @opts = {:stucked_queue_num=>20}.merge(opts) @last_stuck_warn_at = Time.now @worker_threads = {} worker_num.times { t = Thread.new { begin while op = @queue.pop if @queue.size > @opts[:stucked_queue_num] && Time.now - @last_stuck_warn_at > 5.0 logger.warn("too many stacked jobs: #{@queue.size}") @last_stuck_warn_at = Time.now end op.call end rescue WorkerTerminateError # someone indicated to terminate this thread # exit from the current loop break rescue Exception => e logger.error(e) retry ensure EM.schedule { @worker_threads.delete(Thread.current.__id__) logger.debug("#{Thread.current} is being terminated") } end } @worker_threads[t.__id__] = t } end |
Instance Method Details
#barrier(immediate = true, time_out = nil, &blk) ⇒ Object
Send a block to a worker thread similar with pass(). but this get the caller thread waited until the block proceeded in a worker thread.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/isono/thread_pool.rb', line 67 def (immediate=true, time_out=nil, &blk) if immediate && member_thread? return blk.call end q = ::Queue.new time_start = ::Time.now self.pass { begin q << blk.call rescue Exception => e q << e end } em_sig = nil if time_out em_sig = EventMachine.add_timer(time_out) { q << TimeoutError.new } end res = q.shift EventMachine.cancel_timer(em_sig) time_elapsed = ::Time.now - time_start logger.debug("Elapsed time for #{blk}: #{time_elapsed} secs") if time_elapsed > 0.05 if res.is_a?(Exception) raise res end res end |
#clear ⇒ Object
101 102 103 |
# File 'lib/isono/thread_pool.rb', line 101 def clear @queue.clear end |
#graceful_shutdown2 ⇒ Object
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/isono/thread_pool.rb', line 151 def graceful_shutdown2 # make new jobs push to dummy queue. old_queue = @queue @queue = ::Queue.new # wait until @queue becomes empty if !old_queue.empty? logger.info("Waiting for #{old_queue.size} worker jobs in #{self}") while !old_queue.empty? sleep 1 end end @worker_threads.each {|t| t.raise WorkerTerminateError } end |
#member_thread?(thread = Thread.current) ⇒ Boolean
113 114 115 |
# File 'lib/isono/thread_pool.rb', line 113 def member_thread?(thread=Thread.current) @worker_threads.has_key?(thread.__id__) end |
#pass(immediate = true, &blk) ⇒ Object
Pass a block to a worker thread. The job is queued until the worker thread found.
53 54 55 56 57 58 59 |
# File 'lib/isono/thread_pool.rb', line 53 def pass(immediate=true, &blk) if immediate && member_thread? return blk.call end @queue << blk end |
#shutdown ⇒ Object
Immediatly shutdown all the worker threads
106 107 108 109 110 111 |
# File 'lib/isono/thread_pool.rb', line 106 def shutdown() @worker_threads.each {|id, t| t.__send__(:raise, WorkerTerminateError) Thread.pass } end |
#shutdown_graceful(timeout) ⇒ Object
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/isono/thread_pool.rb', line 117 def shutdown_graceful(timeout) term_sig_q = ::Queue.new worker_num = @worker_threads.size # enqueue the terminate jobs. worker_num.times { @queue.push proc { term_sig_q.enq(1) raise WorkerTerminateError } } em_sig = nil if timeout > 0.0 em_sig = EventMachine.add_timer(timeout) { worker_num.times { term_sig_q << TimeoutError.new } } end timeout_workers = 0 while worker_num > 0 if term_sig_q.deq.is_a?(TimeoutError) timeout_workers += 1 end worker_num -= 1 end logger.error("#{timeout_workers} of worker threads timed out during the cleanup") if timeout_workers > 0 ensure shutdown EventMachine.cancel_timer(em_sig) end |