Class: CelluloidPubsub::Reactor

Inherits:
Object
  • Object
show all
Includes:
BaseActor
Defined in:
lib/celluloid_pubsub/reactor.rb

Overview

The reactor handles new connections. Based on what the client sends it either subscribes to a channel or will publish to a channel or just dispatch to the server if command is neither subscribe, publish or unsubscribe

Constant Summary collapse

AVAILABLE_ACTIONS =

available actions that can be delegated

%w[unsubscribe_clients unsubscribe subscribe publish unsubscribe_all].freeze

Instance Attribute Summary collapse

Attributes included from BaseActor

#config

Instance Method Summary collapse

Methods included from BaseActor

boot_up, celluloid_logger_class, celluloid_version, config, included, setup_actor_supervision, version_less_than_eigthteen?, version_less_than_seventeen?

Methods included from Helper

action_subscribe?, #actor_dead?, #cell_actor, fetch_gem_version, filtered_error?, find_loaded_gem, find_loaded_gem_property, get_parsed_version, log_debug, #own_self, parse_options, setup_celluloid_exception_handler, setup_celluloid_logger, setup_log_file, #succesfull_subscription?, verify_gem_version

Instance Attribute Details

#channelsArray

The channels to which this reactor has subscribed to

Returns:

  • (Array)

    array of channels to which the current reactor has subscribed to



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
# File 'lib/celluloid_pubsub/reactor.rb', line 18

class Reactor
  include CelluloidPubsub::BaseActor

  # available actions that can be delegated
  AVAILABLE_ACTIONS = %w[unsubscribe_clients unsubscribe subscribe publish unsubscribe_all].freeze

  # The websocket connection received from the server
  # @return [Reel::WebSocket] websocket connection
  attr_accessor :websocket

  # The server instance to which this reactor is linked to
  # @return [CelluloidPubsub::Webserver] the server actor to which the reactor is connected to
  attr_accessor :server

  # The channels to which this reactor has subscribed to
  # @return [Array] array of channels to which the current reactor has subscribed to
  attr_accessor :channels

  # The same options passed to the server are available on the reactor too
  # @return [Hash] Hash with all the options passed to the server
  attr_reader :options

  finalizer :shutdown
  trap_exit :actor_died

  #  rececives a new socket connection from the server
  #  and listens for messages
  #
  # @param  [Reel::WebSocket] websocket
  #
  # @return [void]
  #
  # @api public
  def work(websocket, server)
    initialize_data(websocket, server)
    server.reactors << Actor.current
    async.run
  end

  # initializes the actor
  #
  # @param  [Reel::WebSocket] websocket
  # @param  [CelluloidPubsub::WebServer] server
  #
  # @return [Celluloid::Actor] returns the actor
  #
  # @api public
  def initialize_data(websocket, server)
    @websocket = websocket
    @server = server
    @options = @server.server_options
    @channels = []
    @shutting_down = false
    setup_celluloid_logger
    log_debug "#{self.class} Streaming changes for #{websocket.url} #{websocket.class.name}"
    yield(websocket, server) if block_given?
    cell_actor
  end

  # the method will return the file path of the log file where debug messages will be printed
  #
  #
  # @return [String] returns the file path of the log file where debug messages will be printed
  #
  # @api public
  def log_file_path
    @log_file_path ||= options.fetch('log_file_path', nil)
  end

  # the method will return the log level of the logger
  #
  # @return [Integer, nil] return the log level used by the logger ( default is 1 - info)
  #
  # @api public
  def log_level
    @log_level ||= options['log_level'] || ::Logger::Severity::INFO
  end

  # the method will return options needed when configuring an adapter
  # @see celluloid_pubsub_redis_adapter for more information
  #
  # @return [Hash] returns options needed by the adapter
  #
  # @api public
  def adapter_options
    @adapter_options ||= options['adapter_options'] || {}
  end

  # the method will return true if the actor is shutting down
  #
  #
  # @return [Boolean] returns true if the actor is shutting down
  #
  # @api public
  def shutting_down?
    @shutting_down == true
  end

  # the method will return true if debug is enabled
  #
  #
  # @return [Boolean] returns true if debug is enabled otherwise false
  #
  # @api public
  def debug_enabled?
    @debug_enabled = options.fetch('enable_debug', false)
    @debug_enabled == true
  end

  # reads from the socket the message
  # and dispatches it to the handle_websocket_message method
  # @see #handle_websocket_message
  #
  # @return [void]
  #
  # @api public
  #
  # :nocov:
  def run
    loop do
      break if shutting_down? || actor_dead?(Actor.current) || @websocket.closed? || actor_dead?(@server)
      message = try_read_websocket
      handle_websocket_message(message) if message.present?
    end
  end
  # :nocov:

  # will try to read the message from the websocket
  # and if it fails will log the exception if debug is enabled
  #
  # @return [void]
  #
  # @api public
  #
  def try_read_websocket
    @websocket.closed? ? nil : @websocket.read
  rescue StandardError
    nil
  end

  # the method will return the reactor's class name used in debug messages
  #
  #
  # @return [Class] returns the reactor's class name used in debug messages
  #
  # @api public
  def reactor_class
    self.class
  end

  # method used to parse a JSON object into a Hash object
  #
  # @param [JSON] message
  #
  # @return [Hash]
  #
  # @api public
  def parse_json_data(message)
    log_debug "#{reactor_class} received #{message}"
    JSON.parse(message)
  rescue StandardError => e
    log_debug "#{reactor_class} could not parse #{message} because of #{e.inspect}"
    message
  end

  # method that handles the message received from the websocket connection
  # first will try to parse the message {#parse_json_data}  and then it will dispatch
  # it to another method that will decide depending the message what action
  # should the reactor take {#handle_parsed_websocket_message}
  #
  # @see #parse_json_data
  # @see #handle_parsed_websocket_message
  #
  # @param [JSON] message
  #
  # @return [void]
  #
  # @api public
  def handle_websocket_message(message)
    log_debug "#{reactor_class} read message  #{message}"
    json_data = parse_json_data(message)
    handle_parsed_websocket_message(json_data)
  end

  # method that checks if the data is a Hash
  #
  # if the data is a hash then will stringify the keys and will call the method {#delegate_action}
  # that will handle the message, otherwise will call the method {#handle_unknown_action}
  #
  # @see #delegate_action
  # @see #handle_unknown_action
  #
  # @param [Hash] json_data
  #
  # @return [void]
  #
  # @api public
  def handle_parsed_websocket_message(json_data)
    data = json_data.is_a?(Hash) ? json_data.stringify_keys : {}
    if CelluloidPubsub::Reactor::AVAILABLE_ACTIONS.include?(data['client_action'].to_s)
      log_debug "#{self.class} finds actions for  #{json_data}"
      delegate_action(data) if data['client_action'].present?
    else
      handle_unknown_action(data['channel'], json_data)
    end
  end

  # method that checks if the data is a Hash
  #
  # if the data is a hash then will stringify the keys and will call the method {#delegate_action}
  # that will handle the message, otherwise will call the method {#handle_unknown_action}
  #
  # @see #delegate_action
  # @see #handle_unknown_action
  #
  # @param [Hash] json_data
  # @option json_data [String] :client_action The action based on which the reactor will decide what action should make
  #
  #   Possible values are:
  #     unsubscribe_all
  #     unsubscribe_clients
  #     unsubscribe
  #     subscribe
  #     publish
  #
  #
  # @return [void]
  #
  # @api public
  def delegate_action(json_data)
    async.send(json_data['client_action'], json_data['channel'], json_data)
  end

  # the method will delegate the message to the server in an asyncronous way by sending the current actor and the message
  # @see CelluloidPubsub::WebServer#handle_dispatched_message
  #
  # @param [Hash] json_data
  #
  # @return [void]
  #
  # @api public
  def handle_unknown_action(channel, json_data)
    log_debug "Trying to dispatch   to server  #{json_data} on channel #{channel}"
    @server.async.handle_dispatched_message(Actor.current, json_data)
  end

  # if the reactor has unsubscribed from all his channels will close the websocket connection,
  # otherwise will delete the channel from his channel list
  #
  # @param [String] channel  The channel that needs to be deleted from the reactor's list of subscribed channels
  #
  # @return [void]
  #
  # @api public
  def forget_channel(channel)
    if @channels.blank?
      @websocket.close
    else
      @channels.delete(channel)
    end
  end

  # the method will unsubscribe a client by closing the websocket connection if has unscribed from all channels
  # and deleting the reactor from the channel list on the server
  #
  # @param [String] channel
  #
  # @return [void]
  #
  # @api public
  def unsubscribe(channel, _json_data)
    log_debug "#{self.class} runs 'unsubscribe' method with  #{channel}"
    return unless channel.present?
    forget_channel(channel)
    delete_server_subscribers(channel)
  end

  # the method will delete the reactor from the channel list on the server
  #
  # @param [String] channel
  #
  # @return [void]
  #
  # @api public
  def delete_server_subscribers(channel)
    @server.mutex.synchronize do
      (@server.subscribers[channel] || []).delete_if do |hash|
        hash[:reactor] == Actor.current
      end
    end
  end

  # the method will unsubscribe all  clients subscribed to a channel by closing the
  #
  # @param [String] channel
  #
  # @return [void]
  #
  # @api public
  def unsubscribe_clients(channel, _json_data)
    log_debug "#{self.class} runs 'unsubscribe_clients' method with  #{channel}"
    return if channel.blank?
    unsubscribe_from_channel(channel)
    @server.subscribers[channel] = []
  end

  # the method will terminate the current actor
  #
  #
  # @return [void]
  #
  # @api public
  def shutdown
    @shutting_down = true
    log_debug "#{self.class} tries to 'shutdown'"
    @websocket.close if @websocket.present? && !@websocket.closed?
    terminate
  end

  # this method will add the current actor to the list of the subscribers {#add_subscriber_to_channel}
  # and will write to the socket a message for succesful subscription
  #
  # @see #add_subscriber_to_channel
  #
  # @param [String] channel
  # @param [Object] message
  #
  # @return [void]
  #
  # @api public
  def subscribe(channel, message)
    return unless channel.present?
    add_subscriber_to_channel(channel, message)
    log_debug "#{self.class} subscribed to #{channel} with #{message}"
    @websocket << message.merge('client_action' => 'successful_subscription', 'channel' => channel).to_json if @server.adapter == CelluloidPubsub::WebServer::CLASSIC_ADAPTER
  end

  # this method will write to the socket all messages that were published
  # to that channel before the actor subscribed
  #
  # @param [String] channel
  # @return [void]
  #
  # @api public
  def send_unpublished(channel)
    return if (messages = unpublished_messages(channel)).blank?
    messages.each do |msg|
      @websocket << msg.to_json
    end
  end

  # the method clears all the messages left unpublished in a channel
  #
  # @param [String] channel
  #
  # @return [void]
  #
  # @api public
  def clear_unpublished_messages(channel)
    CelluloidPubsub::Registry.messages[channel] = []
  end

  # the method will return a list of all unpublished messages in a channel
  #
  # @param [String] channel
  #
  # @return [Array] the list of messages that were not published
  #
  # @api public
  def unpublished_messages(channel)
    (messages = CelluloidPubsub::Registry.messages[channel]).present? ? messages : []
  end

  # this method will return a list of all subscribers to a particular channel or a empty array
  #
  #
  # @param [String] channel The channel that will be used to fetch all subscribers from this channel
  #
  # @return [Array] returns a list of all subscribers to a particular channel or a empty array
  #
  # @api public
  def channel_subscribers(channel)
    @server.subscribers[channel] || []
  end

  # adds the curent actor the list of the subscribers for a particular channel
  # and registers the new channel
  #
  # @param [String] channel
  # @param [Object] message
  #
  # @return [void]
  #
  # @api public
  def add_subscriber_to_channel(channel, message)
    registry_channels = CelluloidPubsub::Registry.channels
    @channels << channel
    registry_channels << channel unless registry_channels.include?(channel)
    @server.mutex.synchronize do
      @server.subscribers[channel] = channel_subscribers(channel).push(reactor: Actor.current, message: message)
    end
  end

  #  method for publishing data to a channel
  #
  # @param [String] current_topic The Channel to which the reactor instance {CelluloidPubsub::Reactor} will publish the message to
  # @param [Object] json_data The additional data that contains the message that needs to be sent
  #
  # @return [void]
  #
  # @api public
  def publish(current_topic, json_data)
    message = json_data['data'].to_json
    return if current_topic.blank? || message.blank?
    server_publish_event(current_topic, message)
  rescue StandardError => e
    log_debug("could not publish message #{message} into topic #{current_topic} because of #{e.inspect}")
  end

  # the method will publish to all subsribers of a channel a message
  #
  # @param [String] current_topic
  # @param [#to_s] message
  #
  # @return [void]
  #
  # @api public
  def server_publish_event(current_topic, message)
    if (subscribers = @server.subscribers[current_topic]).present?
      subscribers.dup.pmap do |hash|
        hash[:reactor].websocket << message
      end
    else
      save_unpublished_message(current_topic, message)
    end
  end

  # the method save the message for a specific channel if there are no subscribers
  #
  # @param [String] current_topic
  # @param [#to_s] message
  #
  # @return [void]
  #
  # @api public
  def save_unpublished_message(current_topic, message)
    @server.timers_mutex.synchronize do
      (CelluloidPubsub::Registry.messages[current_topic] ||= []) << message
    end
  end

  # unsubscribes all actors from all channels and terminates the current actor
  #
  # @param [String] _channel NOT USED - needed to maintain compatibility with the other methods
  # @param [Object] json_data NOT USED - needed to maintain compatibility with the other methods
  #
  # @return [void]
  #
  # @api public
  def unsubscribe_all(_channel, json_data)
    log_debug "#{self.class} runs 'unsubscribe_all' method"
    CelluloidPubsub::Registry.channels.dup.pmap do |channel|
      unsubscribe_clients(channel, json_data)
    end
    log_debug 'clearing connections'
    shutdown
  end

  # unsubscribes all actors from the specified chanel
  #
  # @param [String] channel
  # @return [void]
  #
  # @api public
  def unsubscribe_from_channel(channel)
    log_debug "#{self.class} runs 'unsubscribe_from_channel' method with #{channel}"
    server_kill_reactors(channel)
  end

  # kills all reactors registered on a channel and closes their websocket connection
  #
  # @param [String] channel
  # @return [void]
  #
  # @api public
  def server_kill_reactors(channel)
    @server.mutex.synchronize do
      (@server.subscribers[channel] || []).dup.pmap do |hash|
        reactor = hash[:reactor]
        reactor.websocket.close
        Celluloid::Actor.kill(reactor)
      end
    end
  end

  # method called when the actor is exiting
  #
  # @param [actor] actor - the current actor
  # @param [Hash] reason - the reason it crashed
  #
  # @return [void]
  #
  # @api public
  def actor_died(actor, reason)
    @shutting_down = true
    log_debug "Oh no! #{actor.inspect} has died because of a #{reason.class}"
  end
end

#optionsHash (readonly)

The same options passed to the server are available on the reactor too

Returns:

  • (Hash)

    Hash with all the options passed to the server



38
39
40
# File 'lib/celluloid_pubsub/reactor.rb', line 38

def options
  @options
end

#serverCelluloidPubsub::Webserver

The server instance to which this reactor is linked to

Returns:

  • (CelluloidPubsub::Webserver)

    the server actor to which the reactor is connected to



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
# File 'lib/celluloid_pubsub/reactor.rb', line 18

class Reactor
  include CelluloidPubsub::BaseActor

  # available actions that can be delegated
  AVAILABLE_ACTIONS = %w[unsubscribe_clients unsubscribe subscribe publish unsubscribe_all].freeze

  # The websocket connection received from the server
  # @return [Reel::WebSocket] websocket connection
  attr_accessor :websocket

  # The server instance to which this reactor is linked to
  # @return [CelluloidPubsub::Webserver] the server actor to which the reactor is connected to
  attr_accessor :server

  # The channels to which this reactor has subscribed to
  # @return [Array] array of channels to which the current reactor has subscribed to
  attr_accessor :channels

  # The same options passed to the server are available on the reactor too
  # @return [Hash] Hash with all the options passed to the server
  attr_reader :options

  finalizer :shutdown
  trap_exit :actor_died

  #  rececives a new socket connection from the server
  #  and listens for messages
  #
  # @param  [Reel::WebSocket] websocket
  #
  # @return [void]
  #
  # @api public
  def work(websocket, server)
    initialize_data(websocket, server)
    server.reactors << Actor.current
    async.run
  end

  # initializes the actor
  #
  # @param  [Reel::WebSocket] websocket
  # @param  [CelluloidPubsub::WebServer] server
  #
  # @return [Celluloid::Actor] returns the actor
  #
  # @api public
  def initialize_data(websocket, server)
    @websocket = websocket
    @server = server
    @options = @server.server_options
    @channels = []
    @shutting_down = false
    setup_celluloid_logger
    log_debug "#{self.class} Streaming changes for #{websocket.url} #{websocket.class.name}"
    yield(websocket, server) if block_given?
    cell_actor
  end

  # the method will return the file path of the log file where debug messages will be printed
  #
  #
  # @return [String] returns the file path of the log file where debug messages will be printed
  #
  # @api public
  def log_file_path
    @log_file_path ||= options.fetch('log_file_path', nil)
  end

  # the method will return the log level of the logger
  #
  # @return [Integer, nil] return the log level used by the logger ( default is 1 - info)
  #
  # @api public
  def log_level
    @log_level ||= options['log_level'] || ::Logger::Severity::INFO
  end

  # the method will return options needed when configuring an adapter
  # @see celluloid_pubsub_redis_adapter for more information
  #
  # @return [Hash] returns options needed by the adapter
  #
  # @api public
  def adapter_options
    @adapter_options ||= options['adapter_options'] || {}
  end

  # the method will return true if the actor is shutting down
  #
  #
  # @return [Boolean] returns true if the actor is shutting down
  #
  # @api public
  def shutting_down?
    @shutting_down == true
  end

  # the method will return true if debug is enabled
  #
  #
  # @return [Boolean] returns true if debug is enabled otherwise false
  #
  # @api public
  def debug_enabled?
    @debug_enabled = options.fetch('enable_debug', false)
    @debug_enabled == true
  end

  # reads from the socket the message
  # and dispatches it to the handle_websocket_message method
  # @see #handle_websocket_message
  #
  # @return [void]
  #
  # @api public
  #
  # :nocov:
  def run
    loop do
      break if shutting_down? || actor_dead?(Actor.current) || @websocket.closed? || actor_dead?(@server)
      message = try_read_websocket
      handle_websocket_message(message) if message.present?
    end
  end
  # :nocov:

  # will try to read the message from the websocket
  # and if it fails will log the exception if debug is enabled
  #
  # @return [void]
  #
  # @api public
  #
  def try_read_websocket
    @websocket.closed? ? nil : @websocket.read
  rescue StandardError
    nil
  end

  # the method will return the reactor's class name used in debug messages
  #
  #
  # @return [Class] returns the reactor's class name used in debug messages
  #
  # @api public
  def reactor_class
    self.class
  end

  # method used to parse a JSON object into a Hash object
  #
  # @param [JSON] message
  #
  # @return [Hash]
  #
  # @api public
  def parse_json_data(message)
    log_debug "#{reactor_class} received #{message}"
    JSON.parse(message)
  rescue StandardError => e
    log_debug "#{reactor_class} could not parse #{message} because of #{e.inspect}"
    message
  end

  # method that handles the message received from the websocket connection
  # first will try to parse the message {#parse_json_data}  and then it will dispatch
  # it to another method that will decide depending the message what action
  # should the reactor take {#handle_parsed_websocket_message}
  #
  # @see #parse_json_data
  # @see #handle_parsed_websocket_message
  #
  # @param [JSON] message
  #
  # @return [void]
  #
  # @api public
  def handle_websocket_message(message)
    log_debug "#{reactor_class} read message  #{message}"
    json_data = parse_json_data(message)
    handle_parsed_websocket_message(json_data)
  end

  # method that checks if the data is a Hash
  #
  # if the data is a hash then will stringify the keys and will call the method {#delegate_action}
  # that will handle the message, otherwise will call the method {#handle_unknown_action}
  #
  # @see #delegate_action
  # @see #handle_unknown_action
  #
  # @param [Hash] json_data
  #
  # @return [void]
  #
  # @api public
  def handle_parsed_websocket_message(json_data)
    data = json_data.is_a?(Hash) ? json_data.stringify_keys : {}
    if CelluloidPubsub::Reactor::AVAILABLE_ACTIONS.include?(data['client_action'].to_s)
      log_debug "#{self.class} finds actions for  #{json_data}"
      delegate_action(data) if data['client_action'].present?
    else
      handle_unknown_action(data['channel'], json_data)
    end
  end

  # method that checks if the data is a Hash
  #
  # if the data is a hash then will stringify the keys and will call the method {#delegate_action}
  # that will handle the message, otherwise will call the method {#handle_unknown_action}
  #
  # @see #delegate_action
  # @see #handle_unknown_action
  #
  # @param [Hash] json_data
  # @option json_data [String] :client_action The action based on which the reactor will decide what action should make
  #
  #   Possible values are:
  #     unsubscribe_all
  #     unsubscribe_clients
  #     unsubscribe
  #     subscribe
  #     publish
  #
  #
  # @return [void]
  #
  # @api public
  def delegate_action(json_data)
    async.send(json_data['client_action'], json_data['channel'], json_data)
  end

  # the method will delegate the message to the server in an asyncronous way by sending the current actor and the message
  # @see CelluloidPubsub::WebServer#handle_dispatched_message
  #
  # @param [Hash] json_data
  #
  # @return [void]
  #
  # @api public
  def handle_unknown_action(channel, json_data)
    log_debug "Trying to dispatch   to server  #{json_data} on channel #{channel}"
    @server.async.handle_dispatched_message(Actor.current, json_data)
  end

  # if the reactor has unsubscribed from all his channels will close the websocket connection,
  # otherwise will delete the channel from his channel list
  #
  # @param [String] channel  The channel that needs to be deleted from the reactor's list of subscribed channels
  #
  # @return [void]
  #
  # @api public
  def forget_channel(channel)
    if @channels.blank?
      @websocket.close
    else
      @channels.delete(channel)
    end
  end

  # the method will unsubscribe a client by closing the websocket connection if has unscribed from all channels
  # and deleting the reactor from the channel list on the server
  #
  # @param [String] channel
  #
  # @return [void]
  #
  # @api public
  def unsubscribe(channel, _json_data)
    log_debug "#{self.class} runs 'unsubscribe' method with  #{channel}"
    return unless channel.present?
    forget_channel(channel)
    delete_server_subscribers(channel)
  end

  # the method will delete the reactor from the channel list on the server
  #
  # @param [String] channel
  #
  # @return [void]
  #
  # @api public
  def delete_server_subscribers(channel)
    @server.mutex.synchronize do
      (@server.subscribers[channel] || []).delete_if do |hash|
        hash[:reactor] == Actor.current
      end
    end
  end

  # the method will unsubscribe all  clients subscribed to a channel by closing the
  #
  # @param [String] channel
  #
  # @return [void]
  #
  # @api public
  def unsubscribe_clients(channel, _json_data)
    log_debug "#{self.class} runs 'unsubscribe_clients' method with  #{channel}"
    return if channel.blank?
    unsubscribe_from_channel(channel)
    @server.subscribers[channel] = []
  end

  # the method will terminate the current actor
  #
  #
  # @return [void]
  #
  # @api public
  def shutdown
    @shutting_down = true
    log_debug "#{self.class} tries to 'shutdown'"
    @websocket.close if @websocket.present? && !@websocket.closed?
    terminate
  end

  # this method will add the current actor to the list of the subscribers {#add_subscriber_to_channel}
  # and will write to the socket a message for succesful subscription
  #
  # @see #add_subscriber_to_channel
  #
  # @param [String] channel
  # @param [Object] message
  #
  # @return [void]
  #
  # @api public
  def subscribe(channel, message)
    return unless channel.present?
    add_subscriber_to_channel(channel, message)
    log_debug "#{self.class} subscribed to #{channel} with #{message}"
    @websocket << message.merge('client_action' => 'successful_subscription', 'channel' => channel).to_json if @server.adapter == CelluloidPubsub::WebServer::CLASSIC_ADAPTER
  end

  # this method will write to the socket all messages that were published
  # to that channel before the actor subscribed
  #
  # @param [String] channel
  # @return [void]
  #
  # @api public
  def send_unpublished(channel)
    return if (messages = unpublished_messages(channel)).blank?
    messages.each do |msg|
      @websocket << msg.to_json
    end
  end

  # the method clears all the messages left unpublished in a channel
  #
  # @param [String] channel
  #
  # @return [void]
  #
  # @api public
  def clear_unpublished_messages(channel)
    CelluloidPubsub::Registry.messages[channel] = []
  end

  # the method will return a list of all unpublished messages in a channel
  #
  # @param [String] channel
  #
  # @return [Array] the list of messages that were not published
  #
  # @api public
  def unpublished_messages(channel)
    (messages = CelluloidPubsub::Registry.messages[channel]).present? ? messages : []
  end

  # this method will return a list of all subscribers to a particular channel or a empty array
  #
  #
  # @param [String] channel The channel that will be used to fetch all subscribers from this channel
  #
  # @return [Array] returns a list of all subscribers to a particular channel or a empty array
  #
  # @api public
  def channel_subscribers(channel)
    @server.subscribers[channel] || []
  end

  # adds the curent actor the list of the subscribers for a particular channel
  # and registers the new channel
  #
  # @param [String] channel
  # @param [Object] message
  #
  # @return [void]
  #
  # @api public
  def add_subscriber_to_channel(channel, message)
    registry_channels = CelluloidPubsub::Registry.channels
    @channels << channel
    registry_channels << channel unless registry_channels.include?(channel)
    @server.mutex.synchronize do
      @server.subscribers[channel] = channel_subscribers(channel).push(reactor: Actor.current, message: message)
    end
  end

  #  method for publishing data to a channel
  #
  # @param [String] current_topic The Channel to which the reactor instance {CelluloidPubsub::Reactor} will publish the message to
  # @param [Object] json_data The additional data that contains the message that needs to be sent
  #
  # @return [void]
  #
  # @api public
  def publish(current_topic, json_data)
    message = json_data['data'].to_json
    return if current_topic.blank? || message.blank?
    server_publish_event(current_topic, message)
  rescue StandardError => e
    log_debug("could not publish message #{message} into topic #{current_topic} because of #{e.inspect}")
  end

  # the method will publish to all subsribers of a channel a message
  #
  # @param [String] current_topic
  # @param [#to_s] message
  #
  # @return [void]
  #
  # @api public
  def server_publish_event(current_topic, message)
    if (subscribers = @server.subscribers[current_topic]).present?
      subscribers.dup.pmap do |hash|
        hash[:reactor].websocket << message
      end
    else
      save_unpublished_message(current_topic, message)
    end
  end

  # the method save the message for a specific channel if there are no subscribers
  #
  # @param [String] current_topic
  # @param [#to_s] message
  #
  # @return [void]
  #
  # @api public
  def save_unpublished_message(current_topic, message)
    @server.timers_mutex.synchronize do
      (CelluloidPubsub::Registry.messages[current_topic] ||= []) << message
    end
  end

  # unsubscribes all actors from all channels and terminates the current actor
  #
  # @param [String] _channel NOT USED - needed to maintain compatibility with the other methods
  # @param [Object] json_data NOT USED - needed to maintain compatibility with the other methods
  #
  # @return [void]
  #
  # @api public
  def unsubscribe_all(_channel, json_data)
    log_debug "#{self.class} runs 'unsubscribe_all' method"
    CelluloidPubsub::Registry.channels.dup.pmap do |channel|
      unsubscribe_clients(channel, json_data)
    end
    log_debug 'clearing connections'
    shutdown
  end

  # unsubscribes all actors from the specified chanel
  #
  # @param [String] channel
  # @return [void]
  #
  # @api public
  def unsubscribe_from_channel(channel)
    log_debug "#{self.class} runs 'unsubscribe_from_channel' method with #{channel}"
    server_kill_reactors(channel)
  end

  # kills all reactors registered on a channel and closes their websocket connection
  #
  # @param [String] channel
  # @return [void]
  #
  # @api public
  def server_kill_reactors(channel)
    @server.mutex.synchronize do
      (@server.subscribers[channel] || []).dup.pmap do |hash|
        reactor = hash[:reactor]
        reactor.websocket.close
        Celluloid::Actor.kill(reactor)
      end
    end
  end

  # method called when the actor is exiting
  #
  # @param [actor] actor - the current actor
  # @param [Hash] reason - the reason it crashed
  #
  # @return [void]
  #
  # @api public
  def actor_died(actor, reason)
    @shutting_down = true
    log_debug "Oh no! #{actor.inspect} has died because of a #{reason.class}"
  end
end

#websocketReel::WebSocket

The websocket connection received from the server

Returns:

  • (Reel::WebSocket)

    websocket connection



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
# File 'lib/celluloid_pubsub/reactor.rb', line 18

class Reactor
  include CelluloidPubsub::BaseActor

  # available actions that can be delegated
  AVAILABLE_ACTIONS = %w[unsubscribe_clients unsubscribe subscribe publish unsubscribe_all].freeze

  # The websocket connection received from the server
  # @return [Reel::WebSocket] websocket connection
  attr_accessor :websocket

  # The server instance to which this reactor is linked to
  # @return [CelluloidPubsub::Webserver] the server actor to which the reactor is connected to
  attr_accessor :server

  # The channels to which this reactor has subscribed to
  # @return [Array] array of channels to which the current reactor has subscribed to
  attr_accessor :channels

  # The same options passed to the server are available on the reactor too
  # @return [Hash] Hash with all the options passed to the server
  attr_reader :options

  finalizer :shutdown
  trap_exit :actor_died

  #  rececives a new socket connection from the server
  #  and listens for messages
  #
  # @param  [Reel::WebSocket] websocket
  #
  # @return [void]
  #
  # @api public
  def work(websocket, server)
    initialize_data(websocket, server)
    server.reactors << Actor.current
    async.run
  end

  # initializes the actor
  #
  # @param  [Reel::WebSocket] websocket
  # @param  [CelluloidPubsub::WebServer] server
  #
  # @return [Celluloid::Actor] returns the actor
  #
  # @api public
  def initialize_data(websocket, server)
    @websocket = websocket
    @server = server
    @options = @server.server_options
    @channels = []
    @shutting_down = false
    setup_celluloid_logger
    log_debug "#{self.class} Streaming changes for #{websocket.url} #{websocket.class.name}"
    yield(websocket, server) if block_given?
    cell_actor
  end

  # the method will return the file path of the log file where debug messages will be printed
  #
  #
  # @return [String] returns the file path of the log file where debug messages will be printed
  #
  # @api public
  def log_file_path
    @log_file_path ||= options.fetch('log_file_path', nil)
  end

  # the method will return the log level of the logger
  #
  # @return [Integer, nil] return the log level used by the logger ( default is 1 - info)
  #
  # @api public
  def log_level
    @log_level ||= options['log_level'] || ::Logger::Severity::INFO
  end

  # the method will return options needed when configuring an adapter
  # @see celluloid_pubsub_redis_adapter for more information
  #
  # @return [Hash] returns options needed by the adapter
  #
  # @api public
  def adapter_options
    @adapter_options ||= options['adapter_options'] || {}
  end

  # the method will return true if the actor is shutting down
  #
  #
  # @return [Boolean] returns true if the actor is shutting down
  #
  # @api public
  def shutting_down?
    @shutting_down == true
  end

  # the method will return true if debug is enabled
  #
  #
  # @return [Boolean] returns true if debug is enabled otherwise false
  #
  # @api public
  def debug_enabled?
    @debug_enabled = options.fetch('enable_debug', false)
    @debug_enabled == true
  end

  # reads from the socket the message
  # and dispatches it to the handle_websocket_message method
  # @see #handle_websocket_message
  #
  # @return [void]
  #
  # @api public
  #
  # :nocov:
  def run
    loop do
      break if shutting_down? || actor_dead?(Actor.current) || @websocket.closed? || actor_dead?(@server)
      message = try_read_websocket
      handle_websocket_message(message) if message.present?
    end
  end
  # :nocov:

  # will try to read the message from the websocket
  # and if it fails will log the exception if debug is enabled
  #
  # @return [void]
  #
  # @api public
  #
  def try_read_websocket
    @websocket.closed? ? nil : @websocket.read
  rescue StandardError
    nil
  end

  # the method will return the reactor's class name used in debug messages
  #
  #
  # @return [Class] returns the reactor's class name used in debug messages
  #
  # @api public
  def reactor_class
    self.class
  end

  # method used to parse a JSON object into a Hash object
  #
  # @param [JSON] message
  #
  # @return [Hash]
  #
  # @api public
  def parse_json_data(message)
    log_debug "#{reactor_class} received #{message}"
    JSON.parse(message)
  rescue StandardError => e
    log_debug "#{reactor_class} could not parse #{message} because of #{e.inspect}"
    message
  end

  # method that handles the message received from the websocket connection
  # first will try to parse the message {#parse_json_data}  and then it will dispatch
  # it to another method that will decide depending the message what action
  # should the reactor take {#handle_parsed_websocket_message}
  #
  # @see #parse_json_data
  # @see #handle_parsed_websocket_message
  #
  # @param [JSON] message
  #
  # @return [void]
  #
  # @api public
  def handle_websocket_message(message)
    log_debug "#{reactor_class} read message  #{message}"
    json_data = parse_json_data(message)
    handle_parsed_websocket_message(json_data)
  end

  # method that checks if the data is a Hash
  #
  # if the data is a hash then will stringify the keys and will call the method {#delegate_action}
  # that will handle the message, otherwise will call the method {#handle_unknown_action}
  #
  # @see #delegate_action
  # @see #handle_unknown_action
  #
  # @param [Hash] json_data
  #
  # @return [void]
  #
  # @api public
  def handle_parsed_websocket_message(json_data)
    data = json_data.is_a?(Hash) ? json_data.stringify_keys : {}
    if CelluloidPubsub::Reactor::AVAILABLE_ACTIONS.include?(data['client_action'].to_s)
      log_debug "#{self.class} finds actions for  #{json_data}"
      delegate_action(data) if data['client_action'].present?
    else
      handle_unknown_action(data['channel'], json_data)
    end
  end

  # method that checks if the data is a Hash
  #
  # if the data is a hash then will stringify the keys and will call the method {#delegate_action}
  # that will handle the message, otherwise will call the method {#handle_unknown_action}
  #
  # @see #delegate_action
  # @see #handle_unknown_action
  #
  # @param [Hash] json_data
  # @option json_data [String] :client_action The action based on which the reactor will decide what action should make
  #
  #   Possible values are:
  #     unsubscribe_all
  #     unsubscribe_clients
  #     unsubscribe
  #     subscribe
  #     publish
  #
  #
  # @return [void]
  #
  # @api public
  def delegate_action(json_data)
    async.send(json_data['client_action'], json_data['channel'], json_data)
  end

  # the method will delegate the message to the server in an asyncronous way by sending the current actor and the message
  # @see CelluloidPubsub::WebServer#handle_dispatched_message
  #
  # @param [Hash] json_data
  #
  # @return [void]
  #
  # @api public
  def handle_unknown_action(channel, json_data)
    log_debug "Trying to dispatch   to server  #{json_data} on channel #{channel}"
    @server.async.handle_dispatched_message(Actor.current, json_data)
  end

  # if the reactor has unsubscribed from all his channels will close the websocket connection,
  # otherwise will delete the channel from his channel list
  #
  # @param [String] channel  The channel that needs to be deleted from the reactor's list of subscribed channels
  #
  # @return [void]
  #
  # @api public
  def forget_channel(channel)
    if @channels.blank?
      @websocket.close
    else
      @channels.delete(channel)
    end
  end

  # the method will unsubscribe a client by closing the websocket connection if has unscribed from all channels
  # and deleting the reactor from the channel list on the server
  #
  # @param [String] channel
  #
  # @return [void]
  #
  # @api public
  def unsubscribe(channel, _json_data)
    log_debug "#{self.class} runs 'unsubscribe' method with  #{channel}"
    return unless channel.present?
    forget_channel(channel)
    delete_server_subscribers(channel)
  end

  # the method will delete the reactor from the channel list on the server
  #
  # @param [String] channel
  #
  # @return [void]
  #
  # @api public
  def delete_server_subscribers(channel)
    @server.mutex.synchronize do
      (@server.subscribers[channel] || []).delete_if do |hash|
        hash[:reactor] == Actor.current
      end
    end
  end

  # the method will unsubscribe all  clients subscribed to a channel by closing the
  #
  # @param [String] channel
  #
  # @return [void]
  #
  # @api public
  def unsubscribe_clients(channel, _json_data)
    log_debug "#{self.class} runs 'unsubscribe_clients' method with  #{channel}"
    return if channel.blank?
    unsubscribe_from_channel(channel)
    @server.subscribers[channel] = []
  end

  # the method will terminate the current actor
  #
  #
  # @return [void]
  #
  # @api public
  def shutdown
    @shutting_down = true
    log_debug "#{self.class} tries to 'shutdown'"
    @websocket.close if @websocket.present? && !@websocket.closed?
    terminate
  end

  # this method will add the current actor to the list of the subscribers {#add_subscriber_to_channel}
  # and will write to the socket a message for succesful subscription
  #
  # @see #add_subscriber_to_channel
  #
  # @param [String] channel
  # @param [Object] message
  #
  # @return [void]
  #
  # @api public
  def subscribe(channel, message)
    return unless channel.present?
    add_subscriber_to_channel(channel, message)
    log_debug "#{self.class} subscribed to #{channel} with #{message}"
    @websocket << message.merge('client_action' => 'successful_subscription', 'channel' => channel).to_json if @server.adapter == CelluloidPubsub::WebServer::CLASSIC_ADAPTER
  end

  # this method will write to the socket all messages that were published
  # to that channel before the actor subscribed
  #
  # @param [String] channel
  # @return [void]
  #
  # @api public
  def send_unpublished(channel)
    return if (messages = unpublished_messages(channel)).blank?
    messages.each do |msg|
      @websocket << msg.to_json
    end
  end

  # the method clears all the messages left unpublished in a channel
  #
  # @param [String] channel
  #
  # @return [void]
  #
  # @api public
  def clear_unpublished_messages(channel)
    CelluloidPubsub::Registry.messages[channel] = []
  end

  # the method will return a list of all unpublished messages in a channel
  #
  # @param [String] channel
  #
  # @return [Array] the list of messages that were not published
  #
  # @api public
  def unpublished_messages(channel)
    (messages = CelluloidPubsub::Registry.messages[channel]).present? ? messages : []
  end

  # this method will return a list of all subscribers to a particular channel or a empty array
  #
  #
  # @param [String] channel The channel that will be used to fetch all subscribers from this channel
  #
  # @return [Array] returns a list of all subscribers to a particular channel or a empty array
  #
  # @api public
  def channel_subscribers(channel)
    @server.subscribers[channel] || []
  end

  # adds the curent actor the list of the subscribers for a particular channel
  # and registers the new channel
  #
  # @param [String] channel
  # @param [Object] message
  #
  # @return [void]
  #
  # @api public
  def add_subscriber_to_channel(channel, message)
    registry_channels = CelluloidPubsub::Registry.channels
    @channels << channel
    registry_channels << channel unless registry_channels.include?(channel)
    @server.mutex.synchronize do
      @server.subscribers[channel] = channel_subscribers(channel).push(reactor: Actor.current, message: message)
    end
  end

  #  method for publishing data to a channel
  #
  # @param [String] current_topic The Channel to which the reactor instance {CelluloidPubsub::Reactor} will publish the message to
  # @param [Object] json_data The additional data that contains the message that needs to be sent
  #
  # @return [void]
  #
  # @api public
  def publish(current_topic, json_data)
    message = json_data['data'].to_json
    return if current_topic.blank? || message.blank?
    server_publish_event(current_topic, message)
  rescue StandardError => e
    log_debug("could not publish message #{message} into topic #{current_topic} because of #{e.inspect}")
  end

  # the method will publish to all subsribers of a channel a message
  #
  # @param [String] current_topic
  # @param [#to_s] message
  #
  # @return [void]
  #
  # @api public
  def server_publish_event(current_topic, message)
    if (subscribers = @server.subscribers[current_topic]).present?
      subscribers.dup.pmap do |hash|
        hash[:reactor].websocket << message
      end
    else
      save_unpublished_message(current_topic, message)
    end
  end

  # the method save the message for a specific channel if there are no subscribers
  #
  # @param [String] current_topic
  # @param [#to_s] message
  #
  # @return [void]
  #
  # @api public
  def save_unpublished_message(current_topic, message)
    @server.timers_mutex.synchronize do
      (CelluloidPubsub::Registry.messages[current_topic] ||= []) << message
    end
  end

  # unsubscribes all actors from all channels and terminates the current actor
  #
  # @param [String] _channel NOT USED - needed to maintain compatibility with the other methods
  # @param [Object] json_data NOT USED - needed to maintain compatibility with the other methods
  #
  # @return [void]
  #
  # @api public
  def unsubscribe_all(_channel, json_data)
    log_debug "#{self.class} runs 'unsubscribe_all' method"
    CelluloidPubsub::Registry.channels.dup.pmap do |channel|
      unsubscribe_clients(channel, json_data)
    end
    log_debug 'clearing connections'
    shutdown
  end

  # unsubscribes all actors from the specified chanel
  #
  # @param [String] channel
  # @return [void]
  #
  # @api public
  def unsubscribe_from_channel(channel)
    log_debug "#{self.class} runs 'unsubscribe_from_channel' method with #{channel}"
    server_kill_reactors(channel)
  end

  # kills all reactors registered on a channel and closes their websocket connection
  #
  # @param [String] channel
  # @return [void]
  #
  # @api public
  def server_kill_reactors(channel)
    @server.mutex.synchronize do
      (@server.subscribers[channel] || []).dup.pmap do |hash|
        reactor = hash[:reactor]
        reactor.websocket.close
        Celluloid::Actor.kill(reactor)
      end
    end
  end

  # method called when the actor is exiting
  #
  # @param [actor] actor - the current actor
  # @param [Hash] reason - the reason it crashed
  #
  # @return [void]
  #
  # @api public
  def actor_died(actor, reason)
    @shutting_down = true
    log_debug "Oh no! #{actor.inspect} has died because of a #{reason.class}"
  end
end

Instance Method Details

#actor_died(actor, reason) ⇒ void

This method returns an undefined value.

method called when the actor is exiting

Parameters:

  • actor (actor)
    • the current actor

  • reason (Hash)
    • the reason it crashed



521
522
523
524
# File 'lib/celluloid_pubsub/reactor.rb', line 521

def actor_died(actor, reason)
  @shutting_down = true
  log_debug "Oh no! #{actor.inspect} has died because of a #{reason.class}"
end

#adapter_optionsHash

the method will return options needed when configuring an adapter

Returns:

  • (Hash)

    returns options needed by the adapter

See Also:

  • for more information


102
103
104
# File 'lib/celluloid_pubsub/reactor.rb', line 102

def adapter_options
  @adapter_options ||= options['adapter_options'] || {}
end

#add_subscriber_to_channel(channel, message) ⇒ void

This method returns an undefined value.

adds the curent actor the list of the subscribers for a particular channel and registers the new channel

Parameters:

  • channel (String)
  • message (Object)


412
413
414
415
416
417
418
419
# File 'lib/celluloid_pubsub/reactor.rb', line 412

def add_subscriber_to_channel(channel, message)
  registry_channels = CelluloidPubsub::Registry.channels
  @channels << channel
  registry_channels << channel unless registry_channels.include?(channel)
  @server.mutex.synchronize do
    @server.subscribers[channel] = channel_subscribers(channel).push(reactor: Actor.current, message: message)
  end
end

#channel_subscribers(channel) ⇒ Array

this method will return a list of all subscribers to a particular channel or a empty array

Parameters:

  • channel (String)

    The channel that will be used to fetch all subscribers from this channel

Returns:

  • (Array)

    returns a list of all subscribers to a particular channel or a empty array



399
400
401
# File 'lib/celluloid_pubsub/reactor.rb', line 399

def channel_subscribers(channel)
  @server.subscribers[channel] || []
end

#clear_unpublished_messages(channel) ⇒ void

This method returns an undefined value.

the method clears all the messages left unpublished in a channel

Parameters:

  • channel (String)


376
377
378
# File 'lib/celluloid_pubsub/reactor.rb', line 376

def clear_unpublished_messages(channel)
  CelluloidPubsub::Registry.messages[channel] = []
end

#debug_enabled?Boolean

the method will return true if debug is enabled

Returns:

  • (Boolean)

    returns true if debug is enabled otherwise false



122
123
124
125
# File 'lib/celluloid_pubsub/reactor.rb', line 122

def debug_enabled?
  @debug_enabled = options.fetch('enable_debug', false)
  @debug_enabled == true
end

#delegate_action(json_data) ⇒ void

This method returns an undefined value.

method that checks if the data is a Hash

if the data is a hash then will stringify the keys and will call the method #delegate_action that will handle the message, otherwise will call the method #handle_unknown_action

Parameters:

  • json_data (Hash)

Options Hash (json_data):

  • :client_action (String)

    The action based on which the reactor will decide what action should make

    Possible values are:

    unsubscribe_all
    unsubscribe_clients
    unsubscribe
    subscribe
    publish
    

See Also:



247
248
249
# File 'lib/celluloid_pubsub/reactor.rb', line 247

def delegate_action(json_data)
  async.send(json_data['client_action'], json_data['channel'], json_data)
end

#delete_server_subscribers(channel) ⇒ void

This method returns an undefined value.

the method will delete the reactor from the channel list on the server

Parameters:

  • channel (String)


302
303
304
305
306
307
308
# File 'lib/celluloid_pubsub/reactor.rb', line 302

def delete_server_subscribers(channel)
  @server.mutex.synchronize do
    (@server.subscribers[channel] || []).delete_if do |hash|
      hash[:reactor] == Actor.current
    end
  end
end

#forget_channel(channel) ⇒ void

This method returns an undefined value.

if the reactor has unsubscribed from all his channels will close the websocket connection, otherwise will delete the channel from his channel list

Parameters:

  • channel (String)

    The channel that needs to be deleted from the reactor’s list of subscribed channels



272
273
274
275
276
277
278
# File 'lib/celluloid_pubsub/reactor.rb', line 272

def forget_channel(channel)
  if @channels.blank?
    @websocket.close
  else
    @channels.delete(channel)
  end
end

#handle_parsed_websocket_message(json_data) ⇒ void

This method returns an undefined value.

method that checks if the data is a Hash

if the data is a hash then will stringify the keys and will call the method #delegate_action that will handle the message, otherwise will call the method #handle_unknown_action

Parameters:

  • json_data (Hash)

See Also:



215
216
217
218
219
220
221
222
223
# File 'lib/celluloid_pubsub/reactor.rb', line 215

def handle_parsed_websocket_message(json_data)
  data = json_data.is_a?(Hash) ? json_data.stringify_keys : {}
  if CelluloidPubsub::Reactor::AVAILABLE_ACTIONS.include?(data['client_action'].to_s)
    log_debug "#{self.class} finds actions for  #{json_data}"
    delegate_action(data) if data['client_action'].present?
  else
    handle_unknown_action(data['channel'], json_data)
  end
end

#handle_unknown_action(channel, json_data) ⇒ void

This method returns an undefined value.

the method will delegate the message to the server in an asyncronous way by sending the current actor and the message

Parameters:

  • json_data (Hash)

See Also:



259
260
261
262
# File 'lib/celluloid_pubsub/reactor.rb', line 259

def handle_unknown_action(channel, json_data)
  log_debug "Trying to dispatch   to server  #{json_data} on channel #{channel}"
  @server.async.handle_dispatched_message(Actor.current, json_data)
end

#handle_websocket_message(message) ⇒ void

This method returns an undefined value.

method that handles the message received from the websocket connection first will try to parse the message #parse_json_data and then it will dispatch it to another method that will decide depending the message what action should the reactor take #handle_parsed_websocket_message

Parameters:

  • message (JSON)

See Also:



196
197
198
199
200
# File 'lib/celluloid_pubsub/reactor.rb', line 196

def handle_websocket_message(message)
  log_debug "#{reactor_class} read message  #{message}"
  json_data = parse_json_data(message)
  handle_parsed_websocket_message(json_data)
end

#initialize_data(websocket, server) {|websocket, server| ... } ⇒ Celluloid::Actor

initializes the actor

Parameters:

Yields:

Returns:

  • (Celluloid::Actor)

    returns the actor



65
66
67
68
69
70
71
72
73
74
75
# File 'lib/celluloid_pubsub/reactor.rb', line 65

def initialize_data(websocket, server)
  @websocket = websocket
  @server = server
  @options = @server.server_options
  @channels = []
  @shutting_down = false
  setup_celluloid_logger
  log_debug "#{self.class} Streaming changes for #{websocket.url} #{websocket.class.name}"
  yield(websocket, server) if block_given?
  cell_actor
end

#log_file_pathString

the method will return the file path of the log file where debug messages will be printed

Returns:

  • (String)

    returns the file path of the log file where debug messages will be printed



83
84
85
# File 'lib/celluloid_pubsub/reactor.rb', line 83

def log_file_path
  @log_file_path ||= options.fetch('log_file_path', nil)
end

#log_levelInteger?

the method will return the log level of the logger

Returns:

  • (Integer, nil)

    return the log level used by the logger ( default is 1 - info)



92
93
94
# File 'lib/celluloid_pubsub/reactor.rb', line 92

def log_level
  @log_level ||= options['log_level'] || ::Logger::Severity::INFO
end

#parse_json_data(message) ⇒ Hash

method used to parse a JSON object into a Hash object

Parameters:

  • message (JSON)

Returns:

  • (Hash)


175
176
177
178
179
180
181
# File 'lib/celluloid_pubsub/reactor.rb', line 175

def parse_json_data(message)
  log_debug "#{reactor_class} received #{message}"
  JSON.parse(message)
rescue StandardError => e
  log_debug "#{reactor_class} could not parse #{message} because of #{e.inspect}"
  message
end

#publish(current_topic, json_data) ⇒ void

This method returns an undefined value.

method for publishing data to a channel

Parameters:

  • current_topic (String)

    The Channel to which the reactor instance CelluloidPubsub::Reactor will publish the message to

  • json_data (Object)

    The additional data that contains the message that needs to be sent



429
430
431
432
433
434
435
# File 'lib/celluloid_pubsub/reactor.rb', line 429

def publish(current_topic, json_data)
  message = json_data['data'].to_json
  return if current_topic.blank? || message.blank?
  server_publish_event(current_topic, message)
rescue StandardError => e
  log_debug("could not publish message #{message} into topic #{current_topic} because of #{e.inspect}")
end

#reactor_classClass

the method will return the reactor’s class name used in debug messages

Returns:

  • (Class)

    returns the reactor’s class name used in debug messages



164
165
166
# File 'lib/celluloid_pubsub/reactor.rb', line 164

def reactor_class
  self.class
end

#runvoid

This method returns an undefined value.

reads from the socket the message and dispatches it to the handle_websocket_message method :nocov:



136
137
138
139
140
141
142
# File 'lib/celluloid_pubsub/reactor.rb', line 136

def run
  loop do
    break if shutting_down? || actor_dead?(Actor.current) || @websocket.closed? || actor_dead?(@server)
    message = try_read_websocket
    handle_websocket_message(message) if message.present?
  end
end

#save_unpublished_message(current_topic, message) ⇒ void

This method returns an undefined value.

the method save the message for a specific channel if there are no subscribers

Parameters:

  • current_topic (String)
  • message (#to_s)


463
464
465
466
467
# File 'lib/celluloid_pubsub/reactor.rb', line 463

def save_unpublished_message(current_topic, message)
  @server.timers_mutex.synchronize do
    (CelluloidPubsub::Registry.messages[current_topic] ||= []) << message
  end
end

#send_unpublished(channel) ⇒ void

This method returns an undefined value.

this method will write to the socket all messages that were published to that channel before the actor subscribed

Parameters:

  • channel (String)


362
363
364
365
366
367
# File 'lib/celluloid_pubsub/reactor.rb', line 362

def send_unpublished(channel)
  return if (messages = unpublished_messages(channel)).blank?
  messages.each do |msg|
    @websocket << msg.to_json
  end
end

#server_kill_reactors(channel) ⇒ void

This method returns an undefined value.

kills all reactors registered on a channel and closes their websocket connection

Parameters:

  • channel (String)


503
504
505
506
507
508
509
510
511
# File 'lib/celluloid_pubsub/reactor.rb', line 503

def server_kill_reactors(channel)
  @server.mutex.synchronize do
    (@server.subscribers[channel] || []).dup.pmap do |hash|
      reactor = hash[:reactor]
      reactor.websocket.close
      Celluloid::Actor.kill(reactor)
    end
  end
end

#server_publish_event(current_topic, message) ⇒ void

This method returns an undefined value.

the method will publish to all subsribers of a channel a message

Parameters:

  • current_topic (String)
  • message (#to_s)


445
446
447
448
449
450
451
452
453
# File 'lib/celluloid_pubsub/reactor.rb', line 445

def server_publish_event(current_topic, message)
  if (subscribers = @server.subscribers[current_topic]).present?
    subscribers.dup.pmap do |hash|
      hash[:reactor].websocket << message
    end
  else
    save_unpublished_message(current_topic, message)
  end
end

#shutdownvoid

This method returns an undefined value.

the method will terminate the current actor



330
331
332
333
334
335
# File 'lib/celluloid_pubsub/reactor.rb', line 330

def shutdown
  @shutting_down = true
  log_debug "#{self.class} tries to 'shutdown'"
  @websocket.close if @websocket.present? && !@websocket.closed?
  terminate
end

#shutting_down?Boolean

the method will return true if the actor is shutting down

Returns:

  • (Boolean)

    returns true if the actor is shutting down



112
113
114
# File 'lib/celluloid_pubsub/reactor.rb', line 112

def shutting_down?
  @shutting_down == true
end

#subscribe(channel, message) ⇒ void

This method returns an undefined value.

this method will add the current actor to the list of the subscribers #add_subscriber_to_channel and will write to the socket a message for succesful subscription

Parameters:

  • channel (String)
  • message (Object)

See Also:



348
349
350
351
352
353
# File 'lib/celluloid_pubsub/reactor.rb', line 348

def subscribe(channel, message)
  return unless channel.present?
  add_subscriber_to_channel(channel, message)
  log_debug "#{self.class} subscribed to #{channel} with #{message}"
  @websocket << message.merge('client_action' => 'successful_subscription', 'channel' => channel).to_json if @server.adapter == CelluloidPubsub::WebServer::CLASSIC_ADAPTER
end

#try_read_websocketvoid

This method returns an undefined value.

will try to read the message from the websocket and if it fails will log the exception if debug is enabled



152
153
154
155
156
# File 'lib/celluloid_pubsub/reactor.rb', line 152

def try_read_websocket
  @websocket.closed? ? nil : @websocket.read
rescue StandardError
  nil
end

#unpublished_messages(channel) ⇒ Array

the method will return a list of all unpublished messages in a channel

Parameters:

  • channel (String)

Returns:

  • (Array)

    the list of messages that were not published



387
388
389
# File 'lib/celluloid_pubsub/reactor.rb', line 387

def unpublished_messages(channel)
  (messages = CelluloidPubsub::Registry.messages[channel]).present? ? messages : []
end

#unsubscribe(channel, _json_data) ⇒ void

This method returns an undefined value.

the method will unsubscribe a client by closing the websocket connection if has unscribed from all channels and deleting the reactor from the channel list on the server

Parameters:

  • channel (String)


288
289
290
291
292
293
# File 'lib/celluloid_pubsub/reactor.rb', line 288

def unsubscribe(channel, _json_data)
  log_debug "#{self.class} runs 'unsubscribe' method with  #{channel}"
  return unless channel.present?
  forget_channel(channel)
  delete_server_subscribers(channel)
end

#unsubscribe_all(_channel, json_data) ⇒ void

This method returns an undefined value.

unsubscribes all actors from all channels and terminates the current actor

Parameters:

  • _channel (String)

    NOT USED - needed to maintain compatibility with the other methods

  • json_data (Object)

    NOT USED - needed to maintain compatibility with the other methods



477
478
479
480
481
482
483
484
# File 'lib/celluloid_pubsub/reactor.rb', line 477

def unsubscribe_all(_channel, json_data)
  log_debug "#{self.class} runs 'unsubscribe_all' method"
  CelluloidPubsub::Registry.channels.dup.pmap do |channel|
    unsubscribe_clients(channel, json_data)
  end
  log_debug 'clearing connections'
  shutdown
end

#unsubscribe_clients(channel, _json_data) ⇒ void

This method returns an undefined value.

the method will unsubscribe all clients subscribed to a channel by closing the

Parameters:

  • channel (String)


317
318
319
320
321
322
# File 'lib/celluloid_pubsub/reactor.rb', line 317

def unsubscribe_clients(channel, _json_data)
  log_debug "#{self.class} runs 'unsubscribe_clients' method with  #{channel}"
  return if channel.blank?
  unsubscribe_from_channel(channel)
  @server.subscribers[channel] = []
end

#unsubscribe_from_channel(channel) ⇒ void

This method returns an undefined value.

unsubscribes all actors from the specified chanel

Parameters:

  • channel (String)


492
493
494
495
# File 'lib/celluloid_pubsub/reactor.rb', line 492

def unsubscribe_from_channel(channel)
  log_debug "#{self.class} runs 'unsubscribe_from_channel' method with #{channel}"
  server_kill_reactors(channel)
end

#work(websocket, server) ⇒ void

This method returns an undefined value.

rececives a new socket connection from the server

and listens for messages

Parameters:

  • websocket (Reel::WebSocket)


51
52
53
54
55
# File 'lib/celluloid_pubsub/reactor.rb', line 51

def work(websocket, server)
  initialize_data(websocket, server)
  server.reactors << Actor.current
  async.run
end