Class: Pigato::Worker
- Inherits:
-
Base
- Object
- Base
- Pigato::Worker
show all
- Defined in:
- lib/pigato/worker.rb
Constant Summary
collapse
- HEARTBEAT_LIVENESS =
3
Instance Method Summary
collapse
Methods inherited from Base
#get_iid, #get_mtx, #get_proc_id, #get_socket, #get_thread_id, #init, #sock_close, #sock_create
Constructor Details
#initialize(broker, service, conf = {}) ⇒ Worker
Returns a new instance of Worker.
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
# File 'lib/pigato/worker.rb', line 7
def initialize broker, service, conf = {}
@broker = broker
@service = service
@conf = {
:autostart => false,
:timeout => 2500,
:reconnect => 2500
}
@conf.merge!(conf)
@heartbeat_at = Time.now - 60
@liveness = 0
@reply_to = nil
@reply_rid = nil
@reply_service = nil
init
if @conf[:autostart]
start
end
@@mtx.lock
if @@global_thread.nil?
@@global_thread = true
Thread.new do
client = Pigato::Client.new(broker, { :autostart => true })
loop do
@@mtx.lock
begin
if Time.now > @@global_heartbeat_at
@@sockets_ids.each do |iid, sid|
request = [Pigato::C_CLIENT, Pigato::W_HEARTBEAT, "worker", sid]
msg = ZMQ::Message.new
request.reverse.each{|p| msg.push(ZMQ::Frame(p))}
client.send msg
end
@@global_heartbeat_at = Time.now + 2.5
end
rescue => e
puts e
end
@@mtx.unlock
sleep 2.5
end
end
end
@@mtx.unlock
end
|
Instance Method Details
#recv ⇒ Object
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
# File 'lib/pigato/worker.rb', line 64
def recv
val = nil
@reply_rid = nil
@reply_to = nil
@reply_service = nil
iid = get_iid
start if @@sockets[iid] == nil && @conf[:autostart]
socket = get_socket
return nil if socket.nil?
socket.rcvtimeo = @conf[:timeout]
msg = socket.recv_message
if msg && msg.size
@liveness = HEARTBEAT_LIVENESS
= msg.pop.data
if != Pigato::W_WORKER
puts "E: Header is not Pigato::WORKER"
return nil
end
command = msg.pop.data
case command
when Pigato::W_REQUEST
@reply_to = msg.pop.data
@reply_service = msg.pop.data
msg.pop @reply_rid = msg.pop.data
val = Oj.load(msg.pop.data)
when Pigato::W_HEARTBEAT
when Pigato::W_DISCONNECT
start
else
end
else
@liveness -= 1
if @liveness == 0
sleep 0.001 * @conf[:reconnect]
start
end
end
if Time.now > @heartbeat_at
send(Pigato::W_HEARTBEAT, ['', Oj.dump({ 'concurrency' => 1 })])
@heartbeat_at = Time.now + 0.001 * (@conf[:timeout] * 1.5)
end
val
end
|
#reply(reply) ⇒ Object
59
60
61
62
|
# File 'lib/pigato/worker.rb', line 59
def reply reply
reply = [@reply_to, '', @reply_rid, '0'].concat([Oj.dump(reply)])
send Pigato::W_REPLY, reply
end
|
#send(command, data = nil) ⇒ Object
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
# File 'lib/pigato/worker.rb', line 134
def send command, data = nil
if data.nil?
data = []
elsif not data.is_a?(Array)
data = [data]
end
socket = get_socket
data = [Pigato::W_WORKER, command].concat data
msg = ZMQ::Message.new
data.reverse.each{|p| msg.push(ZMQ::Frame(p))}
socket.send_message msg
end
|
#start ⇒ Object
121
122
123
124
125
126
127
|
# File 'lib/pigato/worker.rb', line 121
def start
stop
sock_create
send Pigato::W_READY, @service
super
@liveness = HEARTBEAT_LIVENESS
end
|
#stop ⇒ Object
129
130
131
132
|
# File 'lib/pigato/worker.rb', line 129
def stop
sock_close
super
end
|