Class: Fluffle::Client
- Inherits:
-
Object
- Object
- Fluffle::Client
- Includes:
- Connectable
- Defined in:
- lib/fluffle/client.rb
Instance Attribute Summary collapse
-
#confirms ⇒ Object
readonly
Returns the value of attribute confirms.
-
#default_timeout ⇒ Object
Returns the value of attribute default_timeout.
-
#logger ⇒ Object
Returns the value of attribute logger.
Instance Method Summary collapse
- #call(method, params = [], queue: 'default', raw_response: false, **opts) ⇒ Object
- #confirm_select ⇒ Object
-
#describe_payload(payload) ⇒ Object
Returns a nice formatted description of a payload with its method name and arity.
-
#handle_reply(delivery_info:, properties:, payload:) ⇒ Object
Fetch and set the
IVarwith a response from the server. -
#initialize(url: nil, connection: nil, confirms: false) ⇒ Client
constructor
A new instance of Client.
- #publish(payload, queue:) ⇒ Object
-
#publish_and_wait(payload, queue:, timeout:) ⇒ Object
Publish a payload to the server and wait (block) for the response.
-
#raise_incomplete(payload, event_name) ⇒ Object
event_name - String describing what we timed out waiting for, should be ‘response’ or ‘confirm’.
- #subscribe ⇒ Object
-
#with_confirmation(timeout:) ⇒ Object
Wraps a block (which should publish a message) with a blocking check that the client received a confirmation from the RabbitMQ server that the message that was received and routed successfully.
Methods included from Connectable
#connect, #connected?, included
Constructor Details
#initialize(url: nil, connection: nil, confirms: false) ⇒ Client
Returns a new instance of Client.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/fluffle/client.rb', line 16 def initialize(url: nil, connection: nil, confirms: false) self.connect(url || connection) @confirms = confirms @default_timeout = 5 @logger = Fluffle.logger @uuid = UUIDTools::UUID..to_s @channel = @connection.create_channel @exchange = @channel.default_exchange @reply_queue = @channel.queue Fluffle.response_queue_name(@uuid), exclusive: true # Used for generating unique message IDs @prng = Random.new if confirms @pending_confirms = Concurrent::Map.new confirm_select end @pending_responses = Concurrent::Map.new subscribe end |
Instance Attribute Details
#confirms ⇒ Object (readonly)
Returns the value of attribute confirms.
12 13 14 |
# File 'lib/fluffle/client.rb', line 12 def confirms @confirms end |
#default_timeout ⇒ Object
Returns the value of attribute default_timeout.
13 14 15 |
# File 'lib/fluffle/client.rb', line 13 def default_timeout @default_timeout end |
#logger ⇒ Object
Returns the value of attribute logger.
14 15 16 |
# File 'lib/fluffle/client.rb', line 14 def logger @logger end |
Instance Method Details
#call(method, params = [], queue: 'default', raw_response: false, **opts) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/fluffle/client.rb', line 86 def call(method, params = [], queue: 'default', raw_response: false, **opts) # Using `.fetch` here so that we can pass `nil` as the timeout and have # it be respected timeout = opts.fetch :timeout, self.default_timeout id = random_bytes_as_hex 8 payload = { 'jsonrpc' => '2.0', 'id' => id, 'method' => method, 'params' => params } response = publish_and_wait payload, queue: queue, timeout: timeout return response if raw_response if response.key? 'result' response['result'] else error = response['error'] || {} raise Errors::CustomError.new code: error['code'] || 0, message: error['message'] || "Missing both `result' and `error' on Response object", data: error['data'] end end |
#confirm_select ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/fluffle/client.rb', line 54 def confirm_select handle_confirm = ->(tag, _multiple, nack) do ivar = @pending_confirms.delete tag if ivar ivar.set nack else self.logger.error "Missing confirm IVar: tag=#{tag}" end end # Set the channel in confirmation mode so that we can receive confirms # of published messages @channel.confirm_select handle_confirm end |
#describe_payload(payload) ⇒ Object
Returns a nice formatted description of a payload with its method name
and arity
149 150 151 152 153 154 |
# File 'lib/fluffle/client.rb', line 149 def describe_payload(payload) method = payload['method'] arity = (payload['params'] && payload['params'].length) || 0 "#{method}/#{arity}" end |
#handle_reply(delivery_info:, properties:, payload:) ⇒ Object
Fetch and set the IVar with a response from the server. This method is called from the reply queue’s background thread; the main thread will normally be waiting for the IVar to be set.
73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/fluffle/client.rb', line 73 def handle_reply(delivery_info:, properties:, payload:) payload = Oj.load payload id = payload['id'] ivar = @pending_responses.delete id if ivar ivar.set payload else self.logger.error "Missing pending response IVar: id=#{id || 'null'}" end end |
#publish(payload, queue:) ⇒ Object
180 181 182 183 184 185 186 187 188 |
# File 'lib/fluffle/client.rb', line 180 def publish(payload, queue:) opts = { routing_key: Fluffle.request_queue_name(queue), correlation_id: payload['id'], reply_to: @reply_queue.name } @exchange.publish Oj.dump(payload), opts end |
#publish_and_wait(payload, queue:, timeout:) ⇒ Object
Publish a payload to the server and wait (block) for the response
It creates an IVar future for the response, stores that in ‘@pending_responses`, and then publishes the payload to the server. After publishing it waits for the IVar to be set with the response. It also clears that IVar if it times out to avoid leaking.
Returns a Hash from the JSON response from the server Raises Fluffle::Errors::TimeoutError if the server failed to respond
within the given time in `timeout:`
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/fluffle/client.rb', line 126 def publish_and_wait(payload, queue:, timeout:) id = payload['id'] response_ivar = Concurrent::IVar.new @pending_responses[id] = response_ivar if confirms with_confirmation(timeout: timeout) { publish payload, queue: queue } else publish payload, queue: queue end response = response_ivar.value timeout raise_incomplete(payload, 'response') if response_ivar.incomplete? return response ensure # Don't leak the `IVar` if it timed out @pending_responses.delete id end |
#raise_incomplete(payload, event_name) ⇒ Object
event_name - String describing what we timed out waiting for, should
be 'response' or 'confirm'
176 177 178 |
# File 'lib/fluffle/client.rb', line 176 def raise_incomplete(payload, event_name) raise Errors::TimeoutError.new("Timed out waiting for #{event_name} to `#{describe_payload(payload)}'") end |
#subscribe ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/fluffle/client.rb', line 40 def subscribe @reply_queue.subscribe do |delivery_info, properties, payload| begin self.handle_reply delivery_info: delivery_info, properties: properties, payload: payload rescue => err # Bunny will let uncaptured errors silently wreck the reply thread, # so we must be extra-careful about capturing them Fluffle.logger.error "[Fluffle::Client] #{err.class}: #{err.message}\n#{err.backtrace.join("\n")}" end end end |
#with_confirmation(timeout:) ⇒ Object
Wraps a block (which should publish a message) with a blocking check
that the client received a confirmation from the RabbitMQ server
that the that was received and routed successfully
159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/fluffle/client.rb', line 159 def with_confirmation(timeout:) tag = @channel.next_publish_seq_no confirm_ivar = Concurrent::IVar.new @pending_confirms[tag] = confirm_ivar yield nack = confirm_ivar.value timeout if confirm_ivar.incomplete? raise_incomplete payload, 'confirm' elsif nack raise Errors::NackError.new('Received nack from confirmation') end end |