Class: AnyCable::Rack::BroadcastSubscribers::RedisSubscriber

Inherits:
BaseSubscriber
  • Object
show all
Defined in:
lib/anycable/rack/broadcast_subscribers/redis_subscriber.rb

Overview

Redis Pub/Sub subscriber

Constant Summary

Constants included from Logging

Logging::PREFIX

Instance Attribute Summary collapse

Attributes inherited from BaseSubscriber

#coder, #hub

Instance Method Summary collapse

Constructor Details

#initialize(hub:, coder:, channel:, **options) ⇒ RedisSubscriber

Returns a new instance of RedisSubscriber.



15
16
17
18
19
# File 'lib/anycable/rack/broadcast_subscribers/redis_subscriber.rb', line 15

def initialize(hub:, coder:, channel:, **options)
  super
  @redis_conn = ::Redis.new(options)
  @channel = channel
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



13
14
15
# File 'lib/anycable/rack/broadcast_subscribers/redis_subscriber.rb', line 13

def channel
  @channel
end

#redis_connObject (readonly)

Returns the value of attribute redis_conn.



13
14
15
# File 'lib/anycable/rack/broadcast_subscribers/redis_subscriber.rb', line 13

def redis_conn
  @redis_conn
end

#threadObject (readonly)

Returns the value of attribute thread.



13
14
15
# File 'lib/anycable/rack/broadcast_subscribers/redis_subscriber.rb', line 13

def thread
  @thread
end

Instance Method Details

#startObject



21
22
23
24
25
# File 'lib/anycable/rack/broadcast_subscribers/redis_subscriber.rb', line 21

def start
  subscribe(channel)

  log(:info) { "Subscribed to #{channel}" }
end

#stopObject



27
28
29
# File 'lib/anycable/rack/broadcast_subscribers/redis_subscriber.rb', line 27

def stop
  thread&.terminate
end

#subscribe(channel) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/anycable/rack/broadcast_subscribers/redis_subscriber.rb', line 31

def subscribe(channel)
  @thread ||= Thread.new do
    Thread.current.abort_on_exception = true

    redis_conn.without_reconnect do
      redis_conn.subscribe(channel) do |on|
        on.subscribe do |chan, count|
          log(:debug) { "Redis subscriber connected to #{chan} (#{count})" }
        end

        on.unsubscribe do |chan, count|
          log(:debug) { "Redis subscribed disconnected from #{chan} (#{count})" }
        end

        on.message do |_channel, msg|
          handle_message(msg)
        rescue
          log(:error) { "Failed to broadcast message: #{msg}" }
        end
      end
    end
  end
end