Class: ZMQMachine::Device::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/zm/devices/queue.rb

Overview

Used in conjunction with REQ/REP sockets to load balance the requests and replies over (potentially) multiple backends.

The basic mechanics are that the program contains 1 (or more) clients that talk to 1 (or more) backends that all perform the same work. Connecting to an intermediate queue device allows for the client requests to be fair-balanced among the available backend servers. The hidden identities passed along by REQ/REP sockets are used by the queue device’s internal XREQ/XREP sockets to route the messages back to the appropriate client.

Example:

# the queue creates sockets and binds to both given addresses; all messages get
# routed between the two
queue = ZM::Device::Queue.new reactor, "tcp://192.168.0.100:5050", "tcp://192.168.0.100:5051"

# the +client_handler+ internally calls "connect" to the incoming address given above
client = reactor.req_socket client_handler
client2 = reactor.req_socket client_handler

# the +server_handler+ internally calls "connect" to the outgoing address given above
server = reactor.rep_socket server_handler

Defined Under Namespace

Classes: Handler

Instance Method Summary collapse

Constructor Details

#initialize(reactor, incoming, outgoing, verbose = false) ⇒ Queue

Takes either a properly formatted string that can be converted into a ZM::Address or takes a ZM::Address directly.

Routes all messages received by either address to the other address.



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/zm/devices/queue.rb', line 103

def initialize reactor, incoming, outgoing, verbose = false
  incoming = Address.from_string incoming if incoming.kind_of? String
  outgoing = Address.from_string outgoing if outgoing.kind_of? String

  # setup the handlers for processing messages
  @handler_in = Handler.new reactor, incoming, verbose, :in
  @handler_out = Handler.new reactor, outgoing, verbose, :out

  # create each socket and pass in the appropriate handler
  @incoming = reactor.xrep_socket @handler_in
  @outgoing = reactor.xreq_socket @handler_out

  # set each handler's outgoing socket
  @handler_in.socket_out = @outgoing
  @handler_out.socket_out = @incoming
end