Class: Workaholic::Worker
- Inherits:
-
Object
- Object
- Workaholic::Worker
- Defined in:
- lib/workaholic/worker.rb
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
Instance Method Summary collapse
-
#initialize ⇒ Worker
constructor
A new instance of Worker.
- #push(job) ⇒ Object
- #run(job) ⇒ Object
- #start(thread_count = 2) ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize ⇒ Worker
Returns a new instance of Worker.
9 10 11 12 13 |
# File 'lib/workaholic/worker.rb', line 9 def initialize @queue = Queue.new @state = :stopped @threads ||= [] end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
6 7 8 |
# File 'lib/workaholic/worker.rb', line 6 def queue @queue end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
7 8 9 |
# File 'lib/workaholic/worker.rb', line 7 def state @state end |
Instance Method Details
#push(job) ⇒ Object
56 57 58 |
# File 'lib/workaholic/worker.rb', line 56 def push( job ) @queue.push job end |
#run(job) ⇒ Object
46 47 48 49 50 51 52 53 54 |
# File 'lib/workaholic/worker.rb', line 46 def run( job ) begin job.run rescue ThreadError # when queue is empty sleep 0.1 rescue e raise e end end |
#start(thread_count = 2) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/workaholic/worker.rb', line 15 def start( thread_count = 2 ) raise RunningError unless state == :stopped @state = :running thread_count.times do |i| t = Thread.start do while [:running, :stopping].include? @state run @queue.pop( true ) sleep 0.016 break if @state == :stopping end end @threads.push t end if block_given? yield self sleep 0.1 while queue.size > 0 stop end end |
#stop ⇒ Object
37 38 39 40 41 42 43 44 |
# File 'lib/workaholic/worker.rb', line 37 def stop @state = :stopping while !@threads.select{ |t| t.alive? }.empty? sleep 1 end @state = :stopped @threads = [] end |