Class: RJR::Nodes::AMQP

Inherits:
RJR::Node show all
Defined in:
lib/rjr/nodes/amqp.rb

Overview

AMQP node definition, implements the RJR::Node interface to listen for and invoke json-rpc requests over the Advanced Message Queuing Protocol.

Clients should specify the amqp broker to connect to when initializing a node and specify the remote queue when invoking requests.

Examples:

Listening for json-rpc requests over amqp

# initialize node,
server = RJR::Nodes::AMQP.new :node_id => 'server', :broker => 'localhost'

# register rjr dispatchers (see RJR::Dispatcher)
server.dispatcher.handle('hello') do |name|
  "Hello #{name}!"
end

# listen, and block
server.listen
server.join

Invoking json-rpc requests over amqp

client = RJR::Nodes::AMQP.new :node_id => 'client', :broker => 'localhost'
puts client.invoke('server-queue', 'hello', 'mo') # the queue name is set to "#{node_id}-queue"

Constant Summary collapse

RJR_NODE_TYPE =
:amqp
PERSISTENT_NODE =
true
INDIRECT_NODE =
true

Instance Attribute Summary

Attributes inherited from RJR::Node

#connection_event_handlers, #dispatcher, #message_headers, #node_id

Instance Method Summary collapse

Methods inherited from RJR::Node

#clear_event_handlers, em, #em, #halt, #indirect?, indirect?, #join, #node_type, #on, persistent?, #persistent?, tp, #tp

Constructor Details

#initialize(args = {}) ⇒ AMQP

AMQPNode initializer

Parameters:

  • args (Hash) (defaults to: {})

    the options to create the amqp node with

Options Hash (args):

  • :broker (String)

    the amqp message broker which to connect to



129
130
131
132
133
134
135
136
137
138
# File 'lib/rjr/nodes/amqp.rb', line 129

def initialize(args = {})
   super(args)
   @host          = args[:host] || args[:broker]
   @port          = args[:port]
   @vhost         = args[:vhost]
   @user          = args[:user] || args[:username]
   @pass          = args[:pass] || args[:password]
   @ssl           = args[:ssl]
   @amqp_lock     = Mutex.new
end

Instance Method Details

#invoke(routing_key, rpc_method, *args) ⇒ Object

Instructs node to send rpc request, and wait for and return response.

Implementation of RJR::Node#invoke

Do not invoke directly from em event loop or callback as will block the message subscription used to receive responses

Parameters:

  • routing_key (String)

    destination queue to send request to

  • rpc_method (String)

    json-rpc method to invoke on destination

  • args (Array)

    array of arguments to convert to json and invoke remote method wtih

Returns:

  • (Object)

    the json result retrieved from destination converted to a ruby object

Raises:

  • (Exception)

    if the destination raises an exception, it will be converted to json and re-raised here



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/rjr/nodes/amqp.rb', line 185

def invoke(routing_key, rpc_method, *args)
  message = Messages::Request.new :method => rpc_method,
                                  :args   => args,
                                  :headers => @message_headers
  @@em.schedule do
    init_node {
      subscribe # begin listening for result
      send_msg(message.to_s, :routing_key => routing_key, :reply_to => @queue_name)
    }
  end

  # TODO optional timeout for response
  result = wait_for_result(message)

  if result.size > 2
    fail result[2]
  end
  return result[1]
end

#listenObject

Instruct Node to start listening for and dispatching rpc requests.

Implementation of RJR::Node#listen



164
165
166
167
168
169
170
171
# File 'lib/rjr/nodes/amqp.rb', line 164

def listen
  @@em.schedule do
    init_node {
      subscribe # start receiving messages
    }
  end
  self
end

#notify(routing_key, rpc_method, *args) ⇒ Object

Instructs node to send rpc notification (immadiately returns / no response is generated)

Implementation of RJR::Node#notif}

Parameters:

  • routing_key (String)

    destination queue to send request to

  • rpc_method (String)

    json-rpc method to invoke on destination

  • args (Array)

    array of arguments to convert to json and invoke remote method wtih



216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/rjr/nodes/amqp.rb', line 216

def notify(routing_key, rpc_method, *args)
  # will block until message is published
  published_l = Mutex.new
  published_c = ConditionVariable.new

  invoked = false
  message = Messages::Notification.new :method => rpc_method,
                                       :args   => args,
                                       :headers => @message_headers
  @@em.schedule do
    init_node {
      send_msg(message.to_s, :routing_key => routing_key, :reply_to => @queue_name){
        published_l.synchronize { invoked = true ; published_c.signal }
      }
    }
  end
  published_l.synchronize { published_c.wait published_l unless invoked }
  nil
end

#send_msg(msg, metadata, &on_publish) ⇒ Object

Publish a message using the amqp exchange

Implementation of RJR::Node#send_msg



147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/rjr/nodes/amqp.rb', line 147

def send_msg(msg, , &on_publish)
  @amqp_lock.synchronize {
    #raise RJR::Errors::ConnectionError.new("client unreachable") if @disconnected
    routing_key = [:routing_key]
    reply_to    = [:reply_to]
    @exchange.publish msg,
                      :routing_key => routing_key,
                      :reply_to => reply_to do |*cargs|
      on_publish.call unless on_publish.nil?
    end
  }
  nil
end

#to_sObject



140
141
142
# File 'lib/rjr/nodes/amqp.rb', line 140

def to_s
  "RJR::Nodes::AMQP<#{@node_id},#{@host},#{@port},#{@vhost},#{@queue_name}>"
end