Class: EventMachine::Protocols::Zmq2::Req

Inherits:
PreReq show all
Defined in:
lib/em/protocols/zmq2/req.rb

Overview

ZMQ socket which acts like REQ. It also reacts on connection busyness, so that it is a bit smarter, than ZMQ REQ. It generates unique request ids and uses ZMQ routing scheme for mapping replies.

The only visible change from PreReq is less frequent send_request false return

Note, that on subclassing, you should override #receive_reply and not #receive_message , and you should use #send_request instead of #send_message

class MyReq < EM::Protocols::Zmq2::Req
  def receive_reply(message, data, request_id)
    puts "received message #{message} and stored data #{data}
  end
end

req = MyReq.new
if request_id = req.send_request('hi', 'ho')
  puts "Message sent"
end

class TimeoutedReq < EM::Protocols::Zmq2::PreReq
  def initialize(opts={})
    super
    @timeout = opts[:timeout] || 1
  end
  def cancel_request(request_id)
    data = super
    EM.cancel_timer(data[:timer])  if data[:timer]
    data[:data]
  end
  def receive_reply(message, data)
    if timer = data[:timer]
      EM.cancel_timer(data[:timer])
    end
    puts "receive message #{message.inspect}, associated data #{data[:data].inspect}"
  end
  def send_request(message, data)
    data = {data: data}
    if request_id = super(message, data)
      data[:timer] = EM.add_timer(@timeout){ cancel_request(request_id) }
    end
    request_id
  end
end
req = TimeoutedReq.new
req.bind('ipc://req')
callback_data = { some_data: "" }
if request_id = req.send_request(['hello', 'world'], callback_data)
  puts "Request sent with request_id #{request_id}"
else
  puts "No free peers and highwatermark reached"
end

Direct Known Subclasses

ReqCb, ReqDefer

Instance Method Summary collapse

Methods inherited from PreReq

#form_request, #receive_message, #receive_reply

Methods inherited from PreDealer

#choose_peer, #receive_message, #receive_message_and_peer, #send_message

Constructor Details

#initialize(opts = {}) ⇒ Req

Returns a new instance of Req.



167
168
169
170
# File 'lib/em/protocols/zmq2/req.rb', line 167

def initialize(opts = {})
  super
  @requests = {}
end

Instance Method Details

#cancel_request(request_id) ⇒ Object

cancel pending request, so that callback will not be called on incoming message

Returns:

  • associated data with request_id



175
176
177
178
# File 'lib/em/protocols/zmq2/req.rb', line 175

def cancel_request(request_id)
  @requests.delete request_id
  @data.delete request_id
end

#flush_all_queueObject

:nodoc:



198
199
200
# File 'lib/em/protocols/zmq2/req.rb', line 198

def flush_all_queue # :nodoc:
  flush_queue(true)
end

#flush_queue(even_if_busy = false) ⇒ Object



189
190
191
192
193
194
195
196
# File 'lib/em/protocols/zmq2/req.rb', line 189

def flush_queue(even_if_busy = false)
  until @requests.empty?
    request_id, request = @requests.first
    return false  unless send_message(request, even_if_busy)
    @requests.delete(request_id)
  end
  true
end

#peer_free(peer, connection) ⇒ Object



202
203
204
205
# File 'lib/em/protocols/zmq2/req.rb', line 202

def peer_free(peer, connection)
  super
  flush_queue
end

#send_request(message, data) ⇒ Object



180
181
182
183
184
185
186
187
# File 'lib/em/protocols/zmq2/req.rb', line 180

def send_request(message, data)
  request = form_request(message)
  request_id = request.first
  @data[request_id] = data
  if flush_queue && send_message(request) || push_to_queue(request)
    request_id
  end
end