Class: ThreadStorm::Worker

Inherits:
Object show all
Defined in:
lib/thread_storm/worker.rb

Overview

:nodoc:

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue, sentinel, options) ⇒ Worker

Takes the threadsafe queue and options from the thread pool.



6
7
8
9
10
11
12
# File 'lib/thread_storm/worker.rb', line 6

def initialize(queue, sentinel, options)
  @queue     = queue
  @sentinel  = sentinel
  @options   = options
  @execution = nil # Current execution we're working on.
  @thread    = Thread.new(self){ |me| me.run }
end

Instance Attribute Details

#executionObject (readonly)

Returns the value of attribute execution.



3
4
5
# File 'lib/thread_storm/worker.rb', line 3

def execution
  @execution
end

#threadObject (readonly)

Returns the value of attribute thread.



3
4
5
# File 'lib/thread_storm/worker.rb', line 3

def thread
  @thread
end

Instance Method Details

#busy?Boolean

Returns:

  • (Boolean)


67
68
69
# File 'lib/thread_storm/worker.rb', line 67

def busy?
  !!@execution and not die?
end

#die?Boolean

True if this worker’s thread should die.

Returns:

  • (Boolean)


72
73
74
# File 'lib/thread_storm/worker.rb', line 72

def die?
  @execution == :die
end

#pop_and_process_executionObject

Pop an execution off the queue and process it, or pass off control to a different thread.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/thread_storm/worker.rb', line 28

def pop_and_process_execution
  @sentinel.synchronize do |e_cond, p_cond|
    # Become idle and signal that we're idle.
    @execution = nil
    e_cond.signal
    
    # Give up the lock and wait until there is work to do.
    p_cond.wait_while{ @queue.empty? }
    
    # Get the work to do (implicitly becoming busy).
    @execution = @queue.pop
  end
  
  process_execution_with_timeout unless die?
end

#process_executionObject

Seriously, process the execution.



63
64
65
# File 'lib/thread_storm/worker.rb', line 63

def process_execution
  execution.value = execution.block.call(*execution.args)
end

#process_execution_with_timeoutObject

Process the execution, handling timeouts and exceptions.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/thread_storm/worker.rb', line 45

def process_execution_with_timeout
  execution.start!
  begin
    if timeout
      timeout_method.call(timeout){ process_execution }
    else
      process_execution
    end
  rescue Timeout::Error => e
    execution.timed_out!
  rescue Exception => e
    execution.exception = e
  ensure
    execution.finish!
  end
end

#runObject

Pop executions and process them until we’re signaled to die.



23
24
25
# File 'lib/thread_storm/worker.rb', line 23

def run
  pop_and_process_execution while not die?
end

#timeoutObject



14
15
16
# File 'lib/thread_storm/worker.rb', line 14

def timeout
  @timeout ||= @options[:timeout]
end

#timeout_methodObject



18
19
20
# File 'lib/thread_storm/worker.rb', line 18

def timeout_method
  @timeout_method ||= @options[:timeout_method]
end