Class: Skein::Client::RPC
Constant Summary
collapse
- EXCHANGE_NAME_DEFAULT =
Constants ============================================================
''.freeze
Instance Attribute Summary
#channel, #connection, #context, #ident
Instance Method Summary
collapse
#lock
Constructor Details
#initialize(exchange_name = nil, routing_key: nil, connection: nil, context: nil) ⇒ RPC
Instance Methods =====================================================
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
# File 'lib/skein/client/rpc.rb', line 13
def initialize(exchange_name = nil, routing_key: nil, connection: nil, context: nil)
super(connection: connection, context: context)
@rpc_exchange = self.channel.direct(exchange_name || EXCHANGE_NAME_DEFAULT, durable: true)
@routing_key = routing_key
@response_queue = self.channel.queue(@ident, durable: true, header: true, auto_delete: true)
@callback = { }
@consumer = Skein::Adapter.subscribe(@response_queue, block: false) do |payload, delivery_tag, reply_to|
self.context.trap do
response = JSON.load(payload)
if (callback = @callback.delete(response['id']))
case (callback)
when Queue
callback << response['result']
when Proc
callback.call
end
end
self.channel.acknowledge(delivery_tag)
end
end
end
|
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(name, *args) ⇒ Object
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
# File 'lib/skein/client/rpc.rb', line 47
def method_missing(name, *args)
name = name.to_s
blocking = !name.sub!(/!\z/, '')
message_id = SecureRandom.uuid
request = JSON.dump(
method: name,
params: args,
id: message_id
)
@rpc_exchange.publish(
request,
routing_key: @routing_key,
reply_to: blocking ? @ident : nil,
content_type: 'application/json',
message_id: message_id
)
if (block_given?)
@callback[message_id] =
if (defined?(EventMachine))
EventMachine.next_tick do
yield
end
else
lambda do
yield
end
end
elsif (blocking)
queue = Queue.new
@callback[message_id] = queue
queue.pop
end
end
|
Instance Method Details
#close ⇒ Object
40
41
42
43
44
45
|
# File 'lib/skein/client/rpc.rb', line 40
def close
@consumer and @consumer.cancel
@consumer = nil
super
end
|