Class: RJR::Nodes::AMQP
Overview
AMQP node definition, implements the RJR::Node interface to listen for and invoke json-rpc requests over the Advanced Message Queuing Protocol.
Clients should specify the amqp broker to connect to when initializing a node and specify the remote queue when invoking requests.
Constant Summary collapse
- RJR_NODE_TYPE =
:amqp- PERSISTENT_NODE =
true- INDIRECT_NODE =
true
Instance Attribute Summary
Attributes inherited from RJR::Node
#connection_event_handlers, #dispatcher, #message_headers, #node_id
Instance Method Summary collapse
-
#initialize(args = {}) ⇒ AMQP
constructor
AMQPNode initializer.
-
#invoke(routing_key, rpc_method, *args) ⇒ Object
Instructs node to send rpc request, and wait for and return response.
-
#listen ⇒ Object
Instruct Node to start listening for and dispatching rpc requests.
-
#notify(routing_key, rpc_method, *args) ⇒ Object
Instructs node to send rpc notification (immadiately returns / no response is generated).
-
#send_msg(msg, metadata, &on_publish) ⇒ Object
Publish a message using the amqp exchange.
- #to_s ⇒ Object
Methods inherited from RJR::Node
#clear_event_handlers, em, #em, #halt, #indirect?, indirect?, #join, #node_type, #on, persistent?, #persistent?, tp, #tp
Constructor Details
#initialize(args = {}) ⇒ AMQP
AMQPNode initializer
129 130 131 132 133 134 135 136 137 138 |
# File 'lib/rjr/nodes/amqp.rb', line 129 def initialize(args = {}) super(args) @host = args[:host] || args[:broker] @port = args[:port] @vhost = args[:vhost] @user = args[:user] || args[:username] @pass = args[:pass] || args[:password] @ssl = args[:ssl] @amqp_lock = Mutex.new end |
Instance Method Details
#invoke(routing_key, rpc_method, *args) ⇒ Object
Instructs node to send rpc request, and wait for and return response.
Implementation of RJR::Node#invoke
Do not invoke directly from em event loop or callback as will block the message subscription used to receive responses
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/rjr/nodes/amqp.rb', line 185 def invoke(routing_key, rpc_method, *args) = Messages::Request.new :method => rpc_method, :args => args, :headers => @message_headers @@em.schedule do init_node { subscribe # begin listening for result send_msg(.to_s, :routing_key => routing_key, :reply_to => @queue_name) } end # TODO optional timeout for response result = wait_for_result() if result.size > 2 fail result[2] end return result[1] end |
#listen ⇒ Object
Instruct Node to start listening for and dispatching rpc requests.
Implementation of RJR::Node#listen
164 165 166 167 168 169 170 171 |
# File 'lib/rjr/nodes/amqp.rb', line 164 def listen @@em.schedule do init_node { subscribe # start receiving messages } end self end |
#notify(routing_key, rpc_method, *args) ⇒ Object
Instructs node to send rpc notification (immadiately returns / no response is generated)
Implementation of RJR::Node#notif}
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/rjr/nodes/amqp.rb', line 216 def notify(routing_key, rpc_method, *args) # will block until message is published published_l = Mutex.new published_c = ConditionVariable.new invoked = false = Messages::Notification.new :method => rpc_method, :args => args, :headers => @message_headers @@em.schedule do init_node { send_msg(.to_s, :routing_key => routing_key, :reply_to => @queue_name){ published_l.synchronize { invoked = true ; published_c.signal } } } end published_l.synchronize { published_c.wait published_l unless invoked } nil end |
#send_msg(msg, metadata, &on_publish) ⇒ Object
Publish a message using the amqp exchange
Implementation of RJR::Node#send_msg
147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/rjr/nodes/amqp.rb', line 147 def send_msg(msg, , &on_publish) @amqp_lock.synchronize { #raise RJR::Errors::ConnectionError.new("client unreachable") if @disconnected routing_key = [:routing_key] reply_to = [:reply_to] @exchange.publish msg, :routing_key => routing_key, :reply_to => reply_to do |*cargs| on_publish.call unless on_publish.nil? end } nil end |
#to_s ⇒ Object
140 141 142 |
# File 'lib/rjr/nodes/amqp.rb', line 140 def to_s "RJR::Nodes::AMQP<#{@node_id},#{@host},#{@port},#{@vhost},#{@queue_name}>" end |