Class: RJR::Nodes::AMQP

Inherits:
RJR::Node show all
Defined in:
lib/rjr/nodes/amqp.rb

Overview

AMQP node definition, implements the RJR::Node interface to listen for and invoke json-rpc requests over the Advanced Message Queuing Protocol.

Clients should specify the amqp broker to connect to when initializing a node and specify the remote queue when invoking requests.

Examples:

Listening for json-rpc requests over amqp

# initialize node,
server = RJR::Nodes::AMQP.new :node_id => 'server', :broker => 'localhost'

# register rjr dispatchers (see RJR::Dispatcher)
server.dispatcher.handle('hello') do |name|
  "Hello #{name}!"
end

# listen, and block
server.listen
server.join

Invoking json-rpc requests over amqp

client = RJR::Nodes::AMQP.new :node_id => 'client', :broker => 'localhost'
puts client.invoke('server-queue', 'hello', 'mo') # the queue name is set to "#{node_id}-queue"

Constant Summary collapse

RJR_NODE_TYPE =
:amqp

Instance Attribute Summary

Attributes inherited from RJR::Node

#dispatcher, #message_headers, #node_id

Instance Method Summary collapse

Methods inherited from RJR::Node

em, #halt, #join, #node_type, #on, tp

Constructor Details

#initialize(args = {}) ⇒ AMQP

AMQPNode initializer

Options Hash (args):

  • :broker (String)

    the amqp message broker which to connect to



116
117
118
119
120
# File 'lib/rjr/nodes/amqp.rb', line 116

def initialize(args = {})
   super(args)
   @broker        = args[:broker]
   @amqp_lock     = Mutex.new
end

Instance Method Details

#invoke(routing_key, rpc_method, *args) ⇒ Object

Instructs node to send rpc request, and wait for and return response.

Implementation of RJR::Node#invoke

Do not invoke directly from em event loop or callback as will block the message subscription used to receive responses

Raises:

  • (Exception)

    if the destination raises an exception, it will be converted to json and re-raised here



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/rjr/nodes/amqp.rb', line 167

def invoke(routing_key, rpc_method, *args)
  message = RequestMessage.new :method => rpc_method,
                               :args   => args,
                               :headers => @message_headers
  @@em.schedule do
    init_node {
      subscribe # begin listening for result
      send_msg(message.to_s, :routing_key => routing_key, :reply_to => @queue_name)
    }
  end

  # TODO optional timeout for response
  result = wait_for_result(message)

  if result.size > 2
    raise Exception, result[2]
  end
  return result[1]
end

#listenObject

Instruct Node to start listening for and dispatching rpc requests.

Implementation of RJR::Node#listen



146
147
148
149
150
151
152
153
# File 'lib/rjr/nodes/amqp.rb', line 146

def listen
  @@em.schedule do
    init_node {
      subscribe # start receiving messages
    }
  end
  self
end

#notify(routing_key, rpc_method, *args) ⇒ Object

Instructs node to send rpc notification (immadiately returns / no response is generated)

Implementation of RJR::Node#notify



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/rjr/nodes/amqp.rb', line 198

def notify(routing_key, rpc_method, *args)
  # will block until message is published
  published_l = Mutex.new
  published_c = ConditionVariable.new

  invoked = false
  message = NotificationMessage.new :method => rpc_method,
                                    :args   => args,
                                    :headers => @message_headers
  @@em.schedule do
    init_node {
      send_msg(message.to_s, :routing_key => routing_key, :reply_to => @queue_name){
        published_l.synchronize { invoked = true ; published_c.signal }
      }
    }
  end
  published_l.synchronize { published_c.wait published_l unless invoked }
  nil
end

#send_msg(msg, metadata, &on_publish) ⇒ Object

Publish a message using the amqp exchange

Implementation of RJR::Node#send_msg



129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/rjr/nodes/amqp.rb', line 129

def send_msg(msg, , &on_publish)
  @amqp_lock.synchronize {
    #raise RJR::Errors::ConnectionError.new("client unreachable") if @disconnected
    routing_key = [:routing_key]
    reply_to    = [:reply_to]
    @exchange.publish msg,
                      :routing_key => routing_key,
                      :reply_to => reply_to do |*cargs|
      on_publish.call unless on_publish.nil?
    end
  }
  nil
end

#to_sObject



122
123
124
# File 'lib/rjr/nodes/amqp.rb', line 122

def to_s
  "RJR::Nodes::AMQP<#{@node_id},#{@broker},#{@queue_name}>"
end