Class: Broadcaster

Inherits:
Object
  • Object
show all
Extended by:
ClassConfig
Defined in:
lib/broadcaster.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Broadcaster

Returns a new instance of Broadcaster.



15
16
17
18
19
20
21
22
23
24
# File 'lib/broadcaster.rb', line 15

def initialize(options={})
  @id = options.fetch(:id, SecureRandom.uuid)
  @logger = options.fetch(:logger, Broadcaster.logger)
  @redis_client = options.fetch(:redis_client, Broadcaster.redis_client)
  @publisher = @redis_client.new options.fetch(:redis_settings, Broadcaster.redis_settings)
  @subscriber = @redis_client.new options.fetch(:redis_settings, Broadcaster.redis_settings)
  @subscriptions = Hash.new { |h,k| h[k] = {} }
  @mutex = Mutex.new
  listen
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



13
14
15
# File 'lib/broadcaster.rb', line 13

def id
  @id
end

Instance Method Details

#publish(channel, message) ⇒ Object



26
27
28
29
# File 'lib/broadcaster.rb', line 26

def publish(channel, message)
  logger.debug(self.class) { "Published | #{scoped(channel)} | #{message}" }
  publisher.call 'PUBLISH', scoped(channel), Marshal.dump(message)
end

#subscribe(channel, callable = nil, &block) ⇒ Object



31
32
33
34
35
36
37
38
# File 'lib/broadcaster.rb', line 31

def subscribe(channel, callable=nil, &block)
  mutex.synchronize do
    SecureRandom.uuid.tap do |subscription_id|
      logger.debug(self.class) { "Subscribed | #{scoped(channel)} | #{subscription_id}" }
      subscriptions[scoped(channel)][subscription_id] = callable || block
    end
  end
end

#unsubscribe(subscription_id) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
# File 'lib/broadcaster.rb', line 40

def unsubscribe(subscription_id)
  mutex.synchronize do
    channel, _ = subscriptions.detect { |k,v| v.key? subscription_id }
    if channel
      logger.debug(self.class) { "Unsubscribed | #{channel} | #{subscription_id}" }
      block = subscriptions[channel].delete subscription_id
      subscriptions.delete_if { |k,v| v.empty? }
      block
    end
  end
end

#unsubscribe_allObject



52
53
54
55
56
57
# File 'lib/broadcaster.rb', line 52

def unsubscribe_all
  mutex.synchronize do
    logger.debug(self.class) { 'Unsubscribed all' }
    subscriptions.clear
  end
end