Class: RJR::Nodes::AMQP
Overview
AMQP node definition, implements the RJR::Node interface to listen for and invoke json-rpc requests over the Advanced Message Queuing Protocol.
Clients should specify the amqp broker to connect to when initializing a node and specify the remote queue when invoking requests.
Constant Summary collapse
- RJR_NODE_TYPE =
:amqp- PERSISTENT_NODE =
true
Instance Attribute Summary
Attributes inherited from RJR::Node
#dispatcher, #message_headers, #node_id
Instance Method Summary collapse
-
#initialize(args = {}) ⇒ AMQP
constructor
AMQPNode initializer.
-
#invoke(routing_key, rpc_method, *args) ⇒ Object
Instructs node to send rpc request, and wait for and return response.
-
#listen ⇒ Object
Instruct Node to start listening for and dispatching rpc requests.
-
#notify(routing_key, rpc_method, *args) ⇒ Object
Instructs node to send rpc notification (immadiately returns / no response is generated).
-
#send_msg(msg, metadata, &on_publish) ⇒ Object
Publish a message using the amqp exchange.
- #to_s ⇒ Object
Methods inherited from RJR::Node
#clear_event_handlers, em, #halt, #join, #node_type, #on, #persistent?, persistent?, tp
Constructor Details
#initialize(args = {}) ⇒ AMQP
AMQPNode initializer
117 118 119 120 121 |
# File 'lib/rjr/nodes/amqp.rb', line 117 def initialize(args = {}) super(args) @broker = args[:broker] @amqp_lock = Mutex.new end |
Instance Method Details
#invoke(routing_key, rpc_method, *args) ⇒ Object
Instructs node to send rpc request, and wait for and return response.
Implementation of RJR::Node#invoke
Do not invoke directly from em event loop or callback as will block the message subscription used to receive responses
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/rjr/nodes/amqp.rb', line 168 def invoke(routing_key, rpc_method, *args) = RequestMessage.new :method => rpc_method, :args => args, :headers => @@em.schedule do init_node { subscribe # begin listening for result send_msg(.to_s, :routing_key => routing_key, :reply_to => @queue_name) } end # TODO optional timeout for response result = wait_for_result() if result.size > 2 raise Exception, result[2] end return result[1] end |
#listen ⇒ Object
Instruct Node to start listening for and dispatching rpc requests.
Implementation of RJR::Node#listen
147 148 149 150 151 152 153 154 |
# File 'lib/rjr/nodes/amqp.rb', line 147 def listen @@em.schedule do init_node { subscribe # start receiving messages } end self end |
#notify(routing_key, rpc_method, *args) ⇒ Object
Instructs node to send rpc notification (immadiately returns / no response is generated)
Implementation of RJR::Node#notify
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/rjr/nodes/amqp.rb', line 199 def notify(routing_key, rpc_method, *args) # will block until message is published published_l = Mutex.new published_c = ConditionVariable.new invoked = false = NotificationMessage.new :method => rpc_method, :args => args, :headers => @@em.schedule do init_node { send_msg(.to_s, :routing_key => routing_key, :reply_to => @queue_name){ published_l.synchronize { invoked = true ; published_c.signal } } } end published_l.synchronize { published_c.wait published_l unless invoked } nil end |
#send_msg(msg, metadata, &on_publish) ⇒ Object
Publish a message using the amqp exchange
Implementation of RJR::Node#send_msg
130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/rjr/nodes/amqp.rb', line 130 def send_msg(msg, , &on_publish) @amqp_lock.synchronize { #raise RJR::Errors::ConnectionError.new("client unreachable") if @disconnected routing_key = [:routing_key] reply_to = [:reply_to] @exchange.publish msg, :routing_key => routing_key, :reply_to => reply_to do |*cargs| on_publish.call unless on_publish.nil? end } nil end |
#to_s ⇒ Object
123 124 125 |
# File 'lib/rjr/nodes/amqp.rb', line 123 def to_s "RJR::Nodes::AMQP<#{@node_id},#{@broker},#{@queue_name}>" end |