Class: Atr::Reactor

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Celluloid::IO, Celluloid::Logger
Defined in:
lib/atr/reactor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(websocket, routing_key_scope = nil) ⇒ Reactor

Returns a new instance of Reactor.



13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/atr/reactor.rb', line 13

def initialize(websocket, routing_key_scope = nil)
  info "Streaming changes"

  @routing_key_scope = routing_key_scope
  @websocket = websocket

  @subscribers = ::Atr::Registry.scoped_channels(routing_key_scope).map do |channel|
    async.start_subscriber(channel)
  end

  async.run
end

Instance Attribute Details

#routing_key_scopeObject

Returns the value of attribute routing_key_scope.



10
11
12
# File 'lib/atr/reactor.rb', line 10

def routing_key_scope
  @routing_key_scope
end

#subscribersObject

Returns the value of attribute subscribers.



11
12
13
# File 'lib/atr/reactor.rb', line 11

def subscribers
  @subscribers
end

#websocketObject

Returns the value of attribute websocket.



9
10
11
# File 'lib/atr/reactor.rb', line 9

def websocket
  @websocket
end

Instance Method Details

#dispatch_message(message) ⇒ Object



26
27
28
# File 'lib/atr/reactor.rb', line 26

def dispatch_message(message)
  puts message.inspect
end

#runObject



30
31
32
33
34
35
36
37
38
# File 'lib/atr/reactor.rb', line 30

def run
  while message = @websocket.read
    if message == "unsubscribe"
      unsubscribe_all
    else
      dispatch_message(message)
    end
  end
end

#shutdownObject



73
74
75
76
77
# File 'lib/atr/reactor.rb', line 73

def shutdown
  ::Atr::Redis.connection.unsubscribe
  ::ActiveRecord::Base.clear_active_connections!
  terminate
end

#start_subscriber(channel) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/atr/reactor.rb', line 79

def start_subscriber(channel)
  ::Atr::Redis.connect unless ::Atr::Redis.connected?

  ::Atr::Redis.connection.subscribe(channel) do |on|
    on.subscribe do |channel, subscriptions|
      puts "Subscribed to ##{channel} (#{subscriptions} subscriptions)"
    end

    on.unsubscribe do |channel, subscriptions|
      puts "Unsubscribed from ##{channel} (#{subscriptions} subscriptions)"
      ::ActiveRecord::Base.clear_active_connections!
      terminate
    end

    on.message do |channel, message|
      shutdown if message == "exit"

      event = Marshal.load(message)

      if ::Atr.config.event_serializer?
        puts "FOUND SERIUALIZER"
        puts ::Atr.config.event_serializer.inspect
        puts ::Atr.config.event_serializer.new(event).to_json
        websocket << ::Atr.config.event_serializer.new(event).to_json
      else
        websocket << event.to_json
      end
    end
  end
end

#start_subscribersObject

todo: decide between starting individually or subscribing all at once and remove one of the methods



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/atr/reactor.rb', line 41

def start_subscribers
  ::Atr::Redis.connect unless ::Atr::Redis.connected?

  ::Atr::Redis.connection.subscribe(::Atr::Registry.scoped_channels(routing_key_scope)) do |on|
    on.subscribe do |channel, subscriptions|
      puts "Subscribed to ##{channel} (#{subscriptions} subscriptions)"
    end

    on.unsubscribe do |channel, subscriptions|
      ::ActiveRecord::Base.clear_active_connections!
      terminate
    end

    on.message do |channel, message|
      shutdown if message == "exit"

      event = Marshal.load(message)

      if ::Atr.config.event_serializer?

        websocket << ::Atr.config.event_serializer.new(event).to_json
      else
        websocket << event.to_json
      end
    end
  end
rescue Reel::SocketError
  info "Client disconnected"
  ::ActiveRecord::Base.clear_active_connections!
  terminate
end

#unsubscribe_allObject



110
111
112
113
114
115
116
117
# File 'lib/atr/reactor.rb', line 110

def unsubscribe_all
  ::Atr::Registry.scoped_channels(routing_key_scope).map do |channel|
    ::Atr::Redis.connection.unsubscribe(channel)
  end

  info "clearing connections"
  terminate
end