Class: Skein::Client::Subscriber

Inherits:
Skein::Connected show all
Defined in:
lib/skein/client/subscriber.rb

Instance Attribute Summary

Attributes inherited from Skein::Connected

#connection, #context, #ident

Instance Method Summary collapse

Methods inherited from Skein::Connected

#channel, #connect, #connection_shared?, #create_channel, #lock, #reconnect, #repeat_until_not_nil

Constructor Details

#initialize(exchange, routing_key = nil, connection: nil, context: nil) ⇒ Subscriber

Instance Methods =====================================================



4
5
6
7
8
9
10
11
12
13
14
15
16
17
# File 'lib/skein/client/subscriber.rb', line 4

def initialize(exchange, routing_key = nil, connection: nil, context: nil)
  super(connection: connection, context: context)

  @exchange =
    case (exchange)
    when String, Symbol
      self.channel.topic(exchange)
    else
      exchange
    end

  @subscribe_queue = self.channel.queue('', exclusive: true, durable: false)
  @subscribe_queue.bind(@exchange, routing_key: routing_key)
end

Instance Method Details

#close(delete_queue: false) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/skein/client/subscriber.rb', line 40

def close(delete_queue: false)
  if (delete_queue)
    begin
      @subscribe_queue.delete
    rescue => e
      case (e.class.to_s)
      when 'Bunny::ChannelAlreadyClosed', 'MarchHare::ChannelAlreadyClosed'
        # Tried to delete, but this has already been shut down
      else
        raise e
      end
    end
  end

  super()
end

#listen(block = true) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/skein/client/subscriber.rb', line 19

def listen(block = true)
  case (@subscribe_queue.class.to_s.split(/::/)[0])
  when 'Bunny'
    begin
      @subscribe_queue.subscribe(block: block) do |delivery_info, properties, payload|
        yield(JSON.load(payload), delivery_info, properties)
      end
    end
  when 'MarchHare'
    begin
      @subscribe_queue.subscribe(block: block) do |, payload|
        yield(JSON.load(payload), )
      end
    rescue MarchHare::ChannelAlreadyClosed
      # Connection got killed outside of thread, so shut-down and move on
    end
  else
    raise "Unknown queue type #{@subscribe_queue.class}, cannot listen."
  end
end