Class: Workaholic::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/workaholic/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeWorker

Returns a new instance of Worker.



9
10
11
12
13
# File 'lib/workaholic/worker.rb', line 9

def initialize
  @queue = Queue.new
  @state = :stopped
  @threads ||= []
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



6
7
8
# File 'lib/workaholic/worker.rb', line 6

def queue
  @queue
end

#stateObject (readonly)

Returns the value of attribute state.



7
8
9
# File 'lib/workaholic/worker.rb', line 7

def state
  @state
end

Instance Method Details

#push(job) ⇒ Object



56
57
58
# File 'lib/workaholic/worker.rb', line 56

def push( job )
  @queue.push job
end

#run(job) ⇒ Object



46
47
48
49
50
51
52
53
54
# File 'lib/workaholic/worker.rb', line 46

def run( job )
  begin
    job.run
  rescue ThreadError # when queue is empty
    sleep 0.1
  rescue e
    raise e
  end
end

#start(thread_count = 2) ⇒ Object

Raises:



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/workaholic/worker.rb', line 15

def start( thread_count = 2 )
  raise RunningError unless state == :stopped

  @state = :running
  thread_count.times do |i|
    t = Thread.start do
      while [:running, :stopping].include? @state
        run @queue.pop( true )
        sleep 0.016
        break if @state == :stopping
      end
    end
    @threads.push t
  end

  if block_given?
    yield self
    sleep 0.1 while queue.size > 0
    stop
  end
end

#stopObject



37
38
39
40
41
42
43
44
# File 'lib/workaholic/worker.rb', line 37

def stop
  @state = :stopping
  while !@threads.select{ |t| t.alive? }.empty?
    sleep 1
  end
  @state = :stopped
  @threads = []
end