Class: EventMachine::Protocols::Zmq2::Req
- Defined in:
- lib/em/protocols/zmq2/req.rb
Overview
ZMQ socket which acts like REQ. It also reacts on connection busyness, so that it is a bit smarter, than ZMQ REQ. It generates unique request ids and uses ZMQ routing scheme for mapping replies.
The only visible change from PreReq is less frequent send_request
false return
Note, that on subclassing, you should override #receive_reply
and not #receive_message
, and you should use #send_request
instead of #send_message
class MyReq < EM::Protocols::Zmq2::Req
def receive_reply(message, data, request_id)
puts "received message #{message} and stored data #{data}
end
end
req = MyReq.new
if request_id = req.send_request('hi', 'ho')
puts "Message sent"
end
class TimeoutedReq < EM::Protocols::Zmq2::PreReq
def initialize(opts={})
super
@timeout = opts[:timeout] || 1
end
def cancel_request(request_id)
data = super
EM.cancel_timer(data[:timer]) if data[:timer]
data[:data]
end
def receive_reply(message, data)
if timer = data[:timer]
EM.cancel_timer(data[:timer])
end
puts "receive message #{message.inspect}, associated data #{data[:data].inspect}"
end
def send_request(message, data)
data = {data: data}
if request_id = super(message, data)
data[:timer] = EM.add_timer(@timeout){ cancel_request(request_id) }
end
request_id
end
end
req = TimeoutedReq.new
req.bind('ipc://req')
callback_data = { some_data: "" }
if request_id = req.send_request(['hello', 'world'], callback_data)
puts "Request sent with request_id #{request_id}"
else
puts "No free peers and highwatermark reached"
end
Instance Method Summary collapse
-
#cancel_request(request_id) ⇒ Object
cancel pending request, so that callback will not be called on incoming message.
-
#flush_all_queue ⇒ Object
:nodoc:.
- #flush_queue(even_if_busy = false) ⇒ Object
-
#initialize(opts = {}) ⇒ Req
constructor
A new instance of Req.
- #peer_free(peer, connection) ⇒ Object
- #send_request(message, data) ⇒ Object
Methods inherited from PreReq
#form_request, #receive_message, #receive_reply
Methods inherited from PreDealer
#choose_peer, #receive_message, #receive_message_and_peer, #send_message
Constructor Details
#initialize(opts = {}) ⇒ Req
Returns a new instance of Req.
167 168 169 170 |
# File 'lib/em/protocols/zmq2/req.rb', line 167 def initialize(opts = {}) super @requests = {} end |
Instance Method Details
#cancel_request(request_id) ⇒ Object
cancel pending request, so that callback will not be called on incoming message
175 176 177 178 |
# File 'lib/em/protocols/zmq2/req.rb', line 175 def cancel_request(request_id) @requests.delete request_id @data.delete request_id end |
#flush_all_queue ⇒ Object
:nodoc:
198 199 200 |
# File 'lib/em/protocols/zmq2/req.rb', line 198 def flush_all_queue # :nodoc: flush_queue(true) end |
#flush_queue(even_if_busy = false) ⇒ Object
189 190 191 192 193 194 195 196 |
# File 'lib/em/protocols/zmq2/req.rb', line 189 def flush_queue(even_if_busy = false) until @requests.empty? request_id, request = @requests.first return false unless (request, even_if_busy) @requests.delete(request_id) end true end |
#peer_free(peer, connection) ⇒ Object
202 203 204 205 |
# File 'lib/em/protocols/zmq2/req.rb', line 202 def peer_free(peer, connection) super flush_queue end |
#send_request(message, data) ⇒ Object
180 181 182 183 184 185 186 187 |
# File 'lib/em/protocols/zmq2/req.rb', line 180 def send_request(, data) request = form_request() request_id = request.first @data[request_id] = data if flush_queue && (request) || push_to_queue(request) request_id end end |