Class: Pubsubstub::Subscriber

Inherits:
Object
  • Object
show all
Includes:
Mutex_m, Logging
Defined in:
lib/pubsubstub/subscriber.rb

Instance Method Summary collapse

Methods included from Logging

#debug, #error, #info

Constructor Details

#initializeSubscriber

Returns a new instance of Subscriber.



6
7
8
9
10
# File 'lib/pubsubstub/subscriber.rb', line 6

def initialize
  super
  @subscribed = false
  @listeners = {}
end

Instance Method Details

#add_event_listener(channel_key, callback) ⇒ Object



16
17
18
19
20
21
# File 'lib/pubsubstub/subscriber.rb', line 16

def add_event_listener(channel_key, callback)
  synchronize do
    @listeners[channel_key] ||= Set.new
    !!@listeners[channel_key].add?(callback)
  end
end

#remove_event_listener(channel_key, callback) ⇒ Object



23
24
25
26
27
28
# File 'lib/pubsubstub/subscriber.rb', line 23

def remove_event_listener(channel_key, callback)
  synchronize do
    return unless @listeners[channel_key]
    !!@listeners[channel_key].delete?(callback)
  end
end

#startObject



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/pubsubstub/subscriber.rb', line 38

def start
  redis.psubscribe(pubsub_pattern) do |on|
    on.psubscribe do
      info { "Subscribed to #{pubsub_pattern}" }
      @subscribed = true
    end

    on.punsubscribe do
      info { "Unsubscribed from #{pubsub_pattern}" }
      @subscribed = false
    end

    on.pmessage do |pattern, pubsub_key, message|
      process_message(pubsub_key, message)
    end
  end
ensure
  info { "Terminated" }
end

#stopObject



30
31
32
33
34
35
36
# File 'lib/pubsubstub/subscriber.rb', line 30

def stop
  # redis.client.call allow to bypass the client mutex
  # Since we now that the only other possible caller is blocking on reading the socket this is safe
  synchronize do
    redis.client.call(['punsubscribe', pubsub_pattern])
  end
end

#subscribed?Boolean

Returns:

  • (Boolean)


12
13
14
# File 'lib/pubsubstub/subscriber.rb', line 12

def subscribed?
  @subscribed
end