Class: JobDispatch::Worker
- Inherits:
-
Object
- Object
- JobDispatch::Worker
show all
- Defined in:
- lib/job_dispatch/worker.rb,
lib/job_dispatch/worker/item.rb,
lib/job_dispatch/worker/socket.rb
Overview
This class is the main worker loop. Run it as a whole process or just as a thread in a multi-threaded worker process.
Defined Under Namespace
Classes: Item, Socket, StopError
Constant Summary
collapse
- IDLE_TIME =
3
- IDLE_COUNT =
10
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(connect_address, options = {}) ⇒ Worker
Returns a new instance of Worker.
21
22
23
24
25
26
27
|
# File 'lib/job_dispatch/worker.rb', line 21
def initialize(connect_address, options={})
options ||= {}
@connect_address = connect_address
@queue = options[:queue] || 'default'
@running = false
@item_class = options[:item_class] || Worker::Item
end
|
Instance Attribute Details
#item_class ⇒ Object
Returns the value of attribute item_class.
19
20
21
|
# File 'lib/job_dispatch/worker.rb', line 19
def item_class
@item_class
end
|
#queue ⇒ Object
Returns the value of attribute queue.
18
19
20
|
# File 'lib/job_dispatch/worker.rb', line 18
def queue
@queue
end
|
#socket ⇒ Object
Returns the value of attribute socket.
17
18
19
|
# File 'lib/job_dispatch/worker.rb', line 17
def socket
@socket
end
|
Class Method Details
.touch(timeout = nil) ⇒ Object
100
101
102
103
104
105
106
107
|
# File 'lib/job_dispatch/worker.rb', line 100
def self.touch(timeout=nil)
sock = Thread.current["JobDispatch::Worker.socket"]
job_id = Thread.current["JobDispatch::Worker.job_id"]
if sock && job_id
sock.send_touch(job_id, timeout)
JobDispatch.logger.debug { "touching job #{job_id}"}
end
end
|
Instance Method Details
#ask_for_work ⇒ Object
81
82
83
|
# File 'lib/job_dispatch/worker.rb', line 81
def ask_for_work
socket.ask_for_work(queue)
end
|
#connect ⇒ Object
29
30
31
32
|
# File 'lib/job_dispatch/worker.rb', line 29
def connect
@socket ||= Worker::Socket.new(@connect_address, item_class)
Thread.current["JobDispatch::Worker.socket"] = @socket
end
|
#disconnect ⇒ Object
34
35
36
37
38
39
40
|
# File 'lib/job_dispatch/worker.rb', line 34
def disconnect
if @socket
@socket.close
@socket = nil
Thread.current["JobDispatch::Worker.socket"] = nil
end
end
|
#run ⇒ Object
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
# File 'lib/job_dispatch/worker.rb', line 42
def run
@running = true
@running_thread = Thread.current
while running?
connect
ask_for_work rescue StopError
idle_count = 0
poller = ZMQ::Poller.new
poller.register(socket.poll_item)
while running? and idle_count < IDLE_COUNT
begin
poller.poll(IDLE_TIME)
if poller.readables.include?(socket.socket)
process
idle_count = 0
else
idle
idle_count += 1
end
rescue Interrupt, StopError
JobDispatch.logger.info("Worker stopping.")
stop
disconnect
connect
send_goodbye
sleep(0.1) end
end
disconnect
end
end
|
#running? ⇒ Boolean
89
90
91
|
# File 'lib/job_dispatch/worker.rb', line 89
def running?
@running
end
|
#send_goodbye ⇒ Object
85
86
87
|
# File 'lib/job_dispatch/worker.rb', line 85
def send_goodbye
socket.send_goodbye(queue)
end
|
#stop ⇒ Object
93
94
95
96
97
98
|
# File 'lib/job_dispatch/worker.rb', line 93
def stop
if running?
@running_thread.raise StopError unless @running_thread == Thread.current
@running = false
end
end
|