Class: Fluffle::Client

Inherits:
Object
  • Object
show all
Includes:
Connectable
Defined in:
lib/fluffle/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Connectable

#connect, #connected?, included

Constructor Details

#initialize(url: nil, connection: nil, confirms: false, mandatory: false) ⇒ Client

Returns a new instance of Client.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fluffle/client.rb', line 17

def initialize(url: nil, connection: nil, confirms: false, mandatory: false)
  self.connect(url || connection)

  @confirms        = confirms
  @mandatory       = mandatory
  @default_timeout = 5
  @logger          = Fluffle.logger

  @uuid        = UUIDTools::UUID.timestamp_create.to_s
  @channel     = @connection.create_channel
  @exchange    = @channel.default_exchange
  @reply_queue = @channel.queue Fluffle.response_queue_name(@uuid), exclusive: true

  # Used for generating unique message IDs
  @prng = Random.new

  if confirms
    @confirmer = Fluffle::Confirmer.new channel: @channel
    @confirmer.confirm_select
  end

  if mandatory
    handle_returns
  end

  @pending_responses = Concurrent::Map.new
  subscribe
end

Instance Attribute Details

#confirmsObject (readonly)

Returns the value of attribute confirms.



12
13
14
# File 'lib/fluffle/client.rb', line 12

def confirms
  @confirms
end

#default_timeoutObject

Returns the value of attribute default_timeout.



14
15
16
# File 'lib/fluffle/client.rb', line 14

def default_timeout
  @default_timeout
end

#loggerObject

Returns the value of attribute logger.



15
16
17
# File 'lib/fluffle/client.rb', line 15

def logger
  @logger
end

#mandatoryObject (readonly)

Returns the value of attribute mandatory.



13
14
15
# File 'lib/fluffle/client.rb', line 13

def mandatory
  @mandatory
end

Instance Method Details

#call(method, params = [], queue: 'default', raw_response: false, **opts) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/fluffle/client.rb', line 90

def call(method, params = [], queue: 'default', raw_response: false, **opts)
  # Using `.fetch` here so that we can pass `nil` as the timeout and have
  # it be respected
  timeout = opts.fetch :timeout, self.default_timeout

  id = random_bytes_as_hex 8

  payload = {
    'jsonrpc' => '2.0',
    'id'      => id,
    'method'  => method,
    'params'  => params
  }

  response = publish_and_wait payload, queue: queue,
                                       timeout: timeout

  return response if raw_response

  if response.key? 'result'
    response['result']
  else
    error = response['error'] || {}

    raise Errors::CustomError.new code: error['code'] || 0,
                                  message: error['message'] || "Missing both `result' and `error' on Response object",
                                  data: error['data']
  end
end

#describe_payload(payload) ⇒ Object

Returns a nice formatted description of a payload with its method name

and arity


166
167
168
169
170
171
# File 'lib/fluffle/client.rb', line 166

def describe_payload(payload)
  method = payload['method']
  arity  = (payload['params'] && payload['params'].length) || 0

  "#{method}/#{arity}"
end

#handle_reply(delivery_info:, properties:, payload:) ⇒ Object

Fetch and set the ‘IVar` with a response from the server. This method is called from the reply queue’s background thread; the main thread will normally be waiting for the ‘IVar` to be set.



77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/fluffle/client.rb', line 77

def handle_reply(delivery_info:, properties:, payload:)
  payload = Oj.load payload
  id      = payload['id']

  ivar = @pending_responses.delete id

  if ivar
    ivar.set payload
  else
    self.logger.error "Missing pending response IVar: id=#{id || 'null'}"
  end
end

#handle_returnsObject



60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/fluffle/client.rb', line 60

def handle_returns
  @exchange.on_return do |return_info, properties, _payload|
    id   = properties[:correlation_id]
    ivar = @pending_responses.delete id

    if ivar
      message = Kernel.sprintf "Received return from exchange for routing key `%s' (%d %s)", return_info.routing_key, return_info.reply_code, return_info.reply_text

      error = Fluffle::Errors::ReturnError.new message
      ivar.set error
    end
  end
end

#publish(payload, queue:) ⇒ Object



173
174
175
176
177
178
179
180
181
182
# File 'lib/fluffle/client.rb', line 173

def publish(payload, queue:)
  opts = {
    routing_key: Fluffle.request_queue_name(queue),
    correlation_id: payload['id'],
    reply_to: @reply_queue.name,
    mandatory: @mandatory,
  }

  @exchange.publish Oj.dump(payload), opts
end

#publish_and_wait(payload, queue:, timeout:) ⇒ Object

Publish a payload to the server and wait (block) for the response

It creates an ‘IVar` future for the response, stores that in `@pending_responses`, and then publishes the payload to the server. After publishing it waits for the `IVar` to be set with the response. It also clears that `IVar` if it times out to avoid leaking.

Returns a ‘Hash` from the JSON response from the server Raises `Fluffle::Errors::TimeoutError` if the server failed to respond

within the given time in `timeout:`


130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/fluffle/client.rb', line 130

def publish_and_wait(payload, queue:, timeout:)
  id = payload['id']

  response_ivar = Concurrent::IVar.new
  @pending_responses[id] = response_ivar

  stack = Fluffle::MiddlewareStack.new

  if confirms
    stack.push ->(publish) do
      @confirmer.with_confirmation timeout: timeout, &publish
    end
  end

  stack.call do
    publish payload, queue: queue
  end

  response = response_ivar.value timeout

  if response_ivar.incomplete?
    raise Errors::TimeoutError.new("Timed out waiting for response to `#{describe_payload(payload)}'")
  elsif response.is_a? StandardError
    # Exchange returns will preempt the response and set it to an error
    # that we can raise
    raise response
  end

  return response
ensure
  # Don't leak the `IVar` if it timed out
  @pending_responses.delete id
end

#subscribeObject



46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/fluffle/client.rb', line 46

def subscribe
  @reply_queue.subscribe do |delivery_info, properties, payload|
    begin
      self.handle_reply delivery_info: delivery_info,
                        properties: properties,
                        payload: payload
    rescue => err
      # Bunny will let uncaptured errors silently wreck the reply thread,
      # so we must be extra-careful about capturing them
      Fluffle.logger.error "[Fluffle::Client] #{err.class}: #{err.message}\n#{err.backtrace.join("\n")}"
    end
  end
end