Class: Firehose::Subscription
- Inherits:
-
Object
- Object
- Firehose::Subscription
- Defined in:
- lib/firehose/subscription.rb
Constant Summary collapse
- TTL =
15000
Instance Attribute Summary collapse
-
#subscriber_id ⇒ Object
readonly
Globally unique subscription id.
-
#ttl ⇒ Object
The time that a queue should live after the client unsubscribes.
Instance Method Summary collapse
-
#initialize(subscriber_id = nil) ⇒ Subscription
constructor
A new instance of Subscription.
- #subscribe(path, &block) ⇒ Object
- #unsubscribe ⇒ Object
Constructor Details
#initialize(subscriber_id = nil) ⇒ Subscription
Returns a new instance of Subscription.
15 16 17 |
# File 'lib/firehose/subscription.rb', line 15 def initialize(subscriber_id=nil) @subscriber_id = subscriber_id || self.class.subscriber_id end |
Instance Attribute Details
#subscriber_id ⇒ Object (readonly)
Globally unique subscription id
13 14 15 |
# File 'lib/firehose/subscription.rb', line 13 def subscriber_id @subscriber_id end |
#ttl ⇒ Object
The time that a queue should live after the client unsubscribes. This is useful for flakey network connections, like HTTP Long Polling or even broken web sockets.
10 11 12 |
# File 'lib/firehose/subscription.rb', line 10 def ttl @ttl end |
Instance Method Details
#subscribe(path, &block) ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/firehose/subscription.rb', line 19 def subscribe(path, &block) queue_name = "#{subscriber_id}@#{path}" channel = AMQP::Channel.new(Firehose.amqp.connection).prefetch(1) exchange = AMQP::Exchange.new(channel, :fanout, path, :auto_delete => true) queue = AMQP::Queue.new(channel, queue_name, :arguments => {'x-expires' => ttl}) queue.bind(exchange) # When we get a message, we want to remove the consumer from the queue so that the x-expires # ttl starts ticking down. On the reconnect, the consumer connects to the queue and resets the # timer on x-expires... in theory at least. @consumer = AMQP::Consumer.new(channel, queue, subscriber_id) @consumer.on_delivery do |, payload| p [:get, subscriber_id, @consumer.consumer_tag, path, payload] block.call(payload) # The ack needs to go after the block is called. This makes sure that all processing # happens downstream before we remove it from the queue entirely. .ack end.consume end |
#unsubscribe ⇒ Object
39 40 41 |
# File 'lib/firehose/subscription.rb', line 39 def unsubscribe @consumer.cancel if @consumer end |