Class: Skein::Client::Subscriber
Instance Attribute Summary
#connection, #context, #ident
Instance Method Summary
collapse
#channel, #connect, #connection_shared?, #create_channel, #lock, #reconnect, #repeat_until_not_nil
Constructor Details
#initialize(exchange, routing_key = nil, connection: nil, context: nil) ⇒ Subscriber
Instance Methods =====================================================
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# File 'lib/skein/client/subscriber.rb', line 4
def initialize(exchange, routing_key = nil, connection: nil, context: nil)
super(connection: connection, context: context)
@exchange =
case (exchange)
when String, Symbol
self.channel.topic(exchange)
else
exchange
end
@subscribe_queue = self.channel.queue('', exclusive: true, durable: false)
@subscribe_queue.bind(@exchange, routing_key: routing_key)
end
|
Instance Method Details
#close(delete_queue: false) ⇒ Object
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/skein/client/subscriber.rb', line 40
def close(delete_queue: false)
if (delete_queue)
begin
@subscribe_queue.delete
rescue => e
case (e.class.to_s)
when 'Bunny::ChannelAlreadyClosed', 'MarchHare::ChannelAlreadyClosed'
else
raise e
end
end
end
super()
end
|
#listen(block = true) ⇒ Object
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
# File 'lib/skein/client/subscriber.rb', line 19
def listen(block = true)
case (@subscribe_queue.class.to_s.split(/::/)[0])
when 'Bunny'
begin
@subscribe_queue.subscribe(block: block) do |delivery_info, properties, payload|
yield(JSON.load(payload), delivery_info, properties)
end
end
when 'MarchHare'
begin
@subscribe_queue.subscribe(block: block) do |metadata, payload|
yield(JSON.load(payload), metadata)
end
rescue MarchHare::ChannelAlreadyClosed
end
else
raise "Unknown queue type #{@subscribe_queue.class}, cannot listen."
end
end
|