Class: Firehose::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/firehose/subscription.rb

Constant Summary collapse

TTL =
15000

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subscriber_id = nil) ⇒ Subscription

Returns a new instance of Subscription.



15
16
17
# File 'lib/firehose/subscription.rb', line 15

def initialize(subscriber_id=nil)
  @subscriber_id = subscriber_id || self.class.subscriber_id
end

Instance Attribute Details

#subscriber_idObject (readonly)

Globally unique subscription id



13
14
15
# File 'lib/firehose/subscription.rb', line 13

def subscriber_id
  @subscriber_id
end

#ttlObject

The time that a queue should live after the client unsubscribes. This is useful for flakey network connections, like HTTP Long Polling or even broken web sockets.



10
11
12
# File 'lib/firehose/subscription.rb', line 10

def ttl
  @ttl
end

Instance Method Details

#subscribe(path, &block) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/firehose/subscription.rb', line 19

def subscribe(path, &block)
  queue_name  = "#{subscriber_id}@#{path}"
  channel     = AMQP::Channel.new(Firehose.amqp.connection).prefetch(1)
  exchange    = AMQP::Exchange.new(channel, :fanout, path, :auto_delete => true)
  queue       = AMQP::Queue.new(channel, queue_name, :arguments => {'x-expires' => ttl})
  queue.bind(exchange)

  # When we get a message, we want to remove the consumer from the queue so that the x-expires
  # ttl starts ticking down. On the reconnect, the consumer connects to the queue and resets the
  # timer on x-expires... in theory at least.
  @consumer = AMQP::Consumer.new(channel, queue, subscriber_id)
  @consumer.on_delivery do |, payload|
    p [:get, subscriber_id, @consumer.consumer_tag, path, payload]
    block.call(payload)
    # The ack needs to go after the block is called. This makes sure that all processing
    # happens downstream before we remove it from the queue entirely.
    .ack
  end.consume
end

#unsubscribeObject



39
40
41
# File 'lib/firehose/subscription.rb', line 39

def unsubscribe
  @consumer.cancel if @consumer
end