Class: Fluffle::Client

Inherits:
Object
  • Object
show all
Includes:
Connectable
Defined in:
lib/fluffle/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Connectable

#connect, #connected?, included

Constructor Details

#initialize(url: nil, connection: nil) ⇒ Client

Returns a new instance of Client.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/fluffle/client.rb', line 15

def initialize(url: nil, connection: nil)
  self.connect(url || connection)

  @default_timeout = 5
  @logger          = Fluffle.logger

  @uuid        = UUIDTools::UUID.timestamp_create.to_s
  @channel     = @connection.create_channel
  @exchange    = @channel.default_exchange
  @reply_queue = @channel.queue Fluffle.response_queue_name(@uuid), exclusive: true

  # Used for generating unique message IDs
  @prng = Random.new

  @pending_responses = Concurrent::Map.new

  self.subscribe
end

Instance Attribute Details

#default_timeoutObject

Returns the value of attribute default_timeout.



12
13
14
# File 'lib/fluffle/client.rb', line 12

def default_timeout
  @default_timeout
end

#loggerObject

Returns the value of attribute logger.



13
14
15
# File 'lib/fluffle/client.rb', line 13

def logger
  @logger
end

Instance Method Details

#call(method, params = [], queue: 'default', **opts) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/fluffle/client.rb', line 58

def call(method, params = [], queue: 'default', **opts)
  # Using `.fetch` here so that we can pass `nil` as the timeout and have
  # it be respected
  timeout = opts.fetch :timeout, self.default_timeout

  id = random_bytes_as_hex 8

  payload = {
    'jsonrpc' => '2.0',
    'id'      => id,
    'method'  => method,
    'params'  => params
  }

  response = publish_and_wait payload, queue: queue,
                                       timeout: timeout

  if response['result']
    response['result']
  else
    error = response['error'] || {}

    raise Errors::CustomError.new code: error['code'] || 0,
                                  message: error['message'] || "Missing both `result' and `error' on Response object",
                                  data: error['data']
  end
end

#handle_reply(delivery_info:, properties:, payload:) ⇒ Object

Fetch and set the IVar with a response from the server. This method is called from the reply queue’s background thread; the main thread will normally be waiting for the IVar to be set.



45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/fluffle/client.rb', line 45

def handle_reply(delivery_info:, properties:, payload:)
  payload = Oj.load payload
  id      = payload['id']

  ivar = @pending_responses.delete id

  if ivar
    ivar.set payload
  else
    self.logger.error "Missing pending response IVar: id=#{id || 'null'}"
  end
end

#publish(payload, queue:) ⇒ Object



118
119
120
121
122
123
124
125
126
# File 'lib/fluffle/client.rb', line 118

def publish(payload, queue:)
  opts = {
    routing_key: Fluffle.request_queue_name(queue),
    correlation_id: payload['id'],
    reply_to: @reply_queue.name
  }

  @exchange.publish Oj.dump(payload), opts
end

#publish_and_wait(payload, queue:, timeout:) ⇒ Object

Publish a payload to the server and wait (block) for the response

It creates an IVar future for the response, stores that in ‘@pending_responses`, and then publishes the payload to the server. After publishing it waits for the IVar to be set with the response. It also clears that IVar if it times out to avoid leaking.

Returns a Hash from the JSON response from the server Raises Fluffle::Errors::TimeoutError if the server failed to respond

within the given time in `timeout:`


96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/fluffle/client.rb', line 96

def publish_and_wait(payload, queue:, timeout:)
  id = payload['id']

  ivar = Concurrent::IVar.new
  @pending_responses[id] = ivar

  self.publish payload, queue: queue

  response = ivar.value timeout

  if ivar.incomplete?
    method = payload['method']
    arity  = (payload['params'] && payload['params'].length) || 0
    raise Errors::TimeoutError.new("Timed out waiting for response to `#{method}/#{arity}'")
  end

  return response
ensure
  # Don't leak the `IVar` if it timed out
  @pending_responses.delete id
end

#subscribeObject



34
35
36
37
38
39
40
# File 'lib/fluffle/client.rb', line 34

def subscribe
  @reply_queue.subscribe do |delivery_info, properties, payload|
    self.handle_reply delivery_info: delivery_info,
                      properties: properties,
                      payload: payload
  end
end