Class: Fluffle::Client

Inherits:
Object
  • Object
show all
Includes:
Connectable
Defined in:
lib/fluffle/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Connectable

#connect, #connected?, included

Constructor Details

#initialize(url: nil, connection: nil, confirms: false) ⇒ Client

Returns a new instance of Client.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/fluffle/client.rb', line 16

def initialize(url: nil, connection: nil, confirms: false)
  self.connect(url || connection)

  @confirms        = confirms
  @default_timeout = 5
  @logger          = Fluffle.logger

  @uuid        = UUIDTools::UUID.timestamp_create.to_s
  @channel     = @connection.create_channel
  @exchange    = @channel.default_exchange
  @reply_queue = @channel.queue Fluffle.response_queue_name(@uuid), exclusive: true

  # Used for generating unique message IDs
  @prng = Random.new

  if confirms
    @pending_confirms = Concurrent::Map.new
    confirm_select
  end

  @pending_responses = Concurrent::Map.new
  subscribe
end

Instance Attribute Details

#confirmsObject (readonly)

Returns the value of attribute confirms.



12
13
14
# File 'lib/fluffle/client.rb', line 12

def confirms
  @confirms
end

#default_timeoutObject

Returns the value of attribute default_timeout.



13
14
15
# File 'lib/fluffle/client.rb', line 13

def default_timeout
  @default_timeout
end

#loggerObject

Returns the value of attribute logger.



14
15
16
# File 'lib/fluffle/client.rb', line 14

def logger
  @logger
end

Instance Method Details

#call(method, params = [], queue: 'default', raw_response: false, **opts) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/fluffle/client.rb', line 86

def call(method, params = [], queue: 'default', raw_response: false, **opts)
  # Using `.fetch` here so that we can pass `nil` as the timeout and have
  # it be respected
  timeout = opts.fetch :timeout, self.default_timeout

  id = random_bytes_as_hex 8

  payload = {
    'jsonrpc' => '2.0',
    'id'      => id,
    'method'  => method,
    'params'  => params
  }

  response = publish_and_wait payload, queue: queue,
                                       timeout: timeout

  return response if raw_response

  if response.key? 'result'
    response['result']
  else
    error = response['error'] || {}

    raise Errors::CustomError.new code: error['code'] || 0,
                                  message: error['message'] || "Missing both `result' and `error' on Response object",
                                  data: error['data']
  end
end

#confirm_selectObject



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/fluffle/client.rb', line 54

def confirm_select
  handle_confirm = ->(tag, _multiple, nack) do
    ivar = @pending_confirms.delete tag

    if ivar
      ivar.set nack
    else
      self.logger.error "Missing confirm IVar: tag=#{tag}"
    end
  end

  # Set the channel in confirmation mode so that we can receive confirms
  # of published messages
  @channel.confirm_select handle_confirm
end

#describe_payload(payload) ⇒ Object

Returns a nice formatted description of a payload with its method name

and arity


149
150
151
152
153
154
# File 'lib/fluffle/client.rb', line 149

def describe_payload(payload)
  method = payload['method']
  arity  = (payload['params'] && payload['params'].length) || 0

  "#{method}/#{arity}"
end

#handle_reply(delivery_info:, properties:, payload:) ⇒ Object

Fetch and set the IVar with a response from the server. This method is called from the reply queue’s background thread; the main thread will normally be waiting for the IVar to be set.



73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/fluffle/client.rb', line 73

def handle_reply(delivery_info:, properties:, payload:)
  payload = Oj.load payload
  id      = payload['id']

  ivar = @pending_responses.delete id

  if ivar
    ivar.set payload
  else
    self.logger.error "Missing pending response IVar: id=#{id || 'null'}"
  end
end

#publish(payload, queue:) ⇒ Object



180
181
182
183
184
185
186
187
188
# File 'lib/fluffle/client.rb', line 180

def publish(payload, queue:)
  opts = {
    routing_key: Fluffle.request_queue_name(queue),
    correlation_id: payload['id'],
    reply_to: @reply_queue.name
  }

  @exchange.publish Oj.dump(payload), opts
end

#publish_and_wait(payload, queue:, timeout:) ⇒ Object

Publish a payload to the server and wait (block) for the response

It creates an IVar future for the response, stores that in ‘@pending_responses`, and then publishes the payload to the server. After publishing it waits for the IVar to be set with the response. It also clears that IVar if it times out to avoid leaking.

Returns a Hash from the JSON response from the server Raises Fluffle::Errors::TimeoutError if the server failed to respond

within the given time in `timeout:`


126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/fluffle/client.rb', line 126

def publish_and_wait(payload, queue:, timeout:)
  id = payload['id']

  response_ivar = Concurrent::IVar.new
  @pending_responses[id] = response_ivar

  if confirms
    with_confirmation(timeout: timeout) { publish payload, queue: queue }
  else
    publish payload, queue: queue
  end

  response = response_ivar.value timeout
  raise_incomplete(payload, 'response') if response_ivar.incomplete?

  return response
ensure
  # Don't leak the `IVar` if it timed out
  @pending_responses.delete id
end

#raise_incomplete(payload, event_name) ⇒ Object

event_name - String describing what we timed out waiting for, should

be 'response' or 'confirm'


176
177
178
# File 'lib/fluffle/client.rb', line 176

def raise_incomplete(payload, event_name)
  raise Errors::TimeoutError.new("Timed out waiting for #{event_name} to `#{describe_payload(payload)}'")
end

#subscribeObject



40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/fluffle/client.rb', line 40

def subscribe
  @reply_queue.subscribe do |delivery_info, properties, payload|
    begin
      self.handle_reply delivery_info: delivery_info,
                        properties: properties,
                        payload: payload
    rescue => err
      # Bunny will let uncaptured errors silently wreck the reply thread,
      # so we must be extra-careful about capturing them
      Fluffle.logger.error "[Fluffle::Client] #{err.class}: #{err.message}\n#{err.backtrace.join("\n")}"
    end
  end
end

#with_confirmation(timeout:) ⇒ Object

Wraps a block (which should publish a message) with a blocking check

that the client received a confirmation from the RabbitMQ server
that the message that was received and routed successfully


159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/fluffle/client.rb', line 159

def with_confirmation(timeout:)
  tag = @channel.next_publish_seq_no
  confirm_ivar = Concurrent::IVar.new
  @pending_confirms[tag] = confirm_ivar

  yield

  nack = confirm_ivar.value timeout
  if confirm_ivar.incomplete?
    raise_incomplete payload, 'confirm'
  elsif nack
    raise Errors::NackError.new('Received nack from confirmation')
  end
end