Class: Fluffle::Client
Instance Method Summary
collapse
#connect, #connected?, included
Constructor Details
#initialize(url:) ⇒ Client
Returns a new instance of Client.
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
# File 'lib/fluffle/client.rb', line 12
def initialize(url:)
self.connect url
@uuid = UUIDTools::UUID.timestamp_create.to_s
@channel = @connection.create_channel
@exchange = @channel.default_exchange
@reply_queue = @channel.queue Fluffle.response_queue_name(@uuid), exclusive: true
@prng = Random.new
@pending_responses = Concurrent::Map.new
self.subscribe
end
|
Instance Method Details
#call(method, params = [], queue: 'default') ⇒ Object
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
# File 'lib/fluffle/client.rb', line 43
def call(method, params = [], queue: 'default')
id = random_bytes_as_hex 8
payload = {
'jsonrpc' => '2.0',
'id' => id,
'method' => method,
'params' => params
}
@exchange.publish Oj.dump(payload), routing_key: Fluffle.request_queue_name(queue),
correlation_id: id,
reply_to: @reply_queue.name
ivar = Concurrent::IVar.new
@pending_responses[id] = ivar
response = ivar.value
if response['result']
response['result']
else
raise end
end
|
#handle_resposne(delivery_info:, properties:, payload:) ⇒ Object
36
37
38
39
40
41
|
# File 'lib/fluffle/client.rb', line 36
def handle_resposne(delivery_info:, properties:, payload:)
payload = Oj.load payload
ivar = @pending_responses.delete payload['id']
ivar.set payload
end
|
#subscribe ⇒ Object
28
29
30
31
32
33
34
|
# File 'lib/fluffle/client.rb', line 28
def subscribe
@reply_queue.subscribe do |delivery_info, properties, payload|
self.handle_resposne delivery_info: delivery_info,
properties: properties,
payload: payload
end
end
|