Class: ActionCable::SubscriptionAdapter::PostgreSQL::Listener

Inherits:
SubscriberMap::Async show all
Defined in:
lib/action_cable/subscription_adapter/postgresql.rb

Instance Method Summary collapse

Methods inherited from SubscriberMap::Async

#add_subscriber, #invoke_callback, #remove_subscriber

Methods inherited from SubscriberMap

#add_subscriber, #broadcast, #invoke_callback, #remove_subscriber

Constructor Details

#initialize(adapter, executor) ⇒ Listener

Returns a new instance of Listener.



76
77
78
79
80
81
82
83
84
85
86
# File 'lib/action_cable/subscription_adapter/postgresql.rb', line 76

def initialize(adapter, executor)
  super(executor)

  @adapter = adapter
  @queue = Queue.new

  @thread = Thread.new do
    Thread.current.abort_on_exception = true
    listen
  end
end

Instance Method Details

#add_channel(channel, on_success) ⇒ Object



119
120
121
# File 'lib/action_cable/subscription_adapter/postgresql.rb', line 119

def add_channel(channel, on_success)
  @queue.push([:listen, channel, on_success])
end

#listenObject



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/action_cable/subscription_adapter/postgresql.rb', line 88

def listen
  @adapter.with_subscriptions_connection do |pg_conn|
    catch :shutdown do
      loop do
        until @queue.empty?
          action, channel, callback = @queue.pop(true)

          case action
          when :listen
            pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
            @executor.post(&callback) if callback
          when :unlisten
            pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
          when :shutdown
            throw :shutdown
          end
        end

        pg_conn.wait_for_notify(1) do |chan, pid, message|
          broadcast(chan, message)
        end
      end
    end
  end
end

#remove_channel(channel) ⇒ Object



123
124
125
# File 'lib/action_cable/subscription_adapter/postgresql.rb', line 123

def remove_channel(channel)
  @queue.push([:unlisten, channel])
end

#shutdownObject



114
115
116
117
# File 'lib/action_cable/subscription_adapter/postgresql.rb', line 114

def shutdown
  @queue.push([:shutdown])
  Thread.pass while @thread.alive?
end