Class: CelluloidPubsub::Client

Inherits:
Object
  • Object
show all
Includes:
BaseActor
Defined in:
lib/celluloid_pubsub/client.rb

Overview

worker that subscribes to a channel or publishes to a channel if it used to subscribe to a channel the worker will dispatch the messages to the actor that made the connection in the first place.

Instance Attribute Summary collapse

Attributes included from BaseActor

#config

Instance Method Summary collapse

Methods included from BaseActor

boot_up, celluloid_logger_class, celluloid_version, config, included, setup_actor_supervision, version_less_than_eigthteen?, version_less_than_seventeen?

Methods included from Helper

action_subscribe?, #actor_dead?, #cell_actor, fetch_gem_version, filtered_error?, find_loaded_gem, find_loaded_gem_property, get_parsed_version, log_debug, #own_self, parse_options, setup_celluloid_exception_handler, setup_celluloid_logger, setup_log_file, #succesfull_subscription?, verify_gem_version

Constructor Details

#initialize(options) ⇒ void

receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to

when receiving messages from a channel

Parameters:

  • options (Hash)

    the options that can be used to connect to webser and send additional data

Options Hash (options):

  • :actor (String)

    The actor that made the connection

  • :channel (String)

    The channel to which the client will subscribe to once the connection is open

  • :log_file_path (String)

    The path to the log file where debug messages will be printed, otherwise will use STDOUT

  • :hostname (String)

    The hostname on which the webserver runs on

  • :port (String)

    The port on which the webserver runs on

  • :path (String)

    The request path that the webserver accepts



50
51
52
53
54
55
56
57
58
59
# File 'lib/celluloid_pubsub/client.rb', line 50

def initialize(options)
  @options = options.stringify_keys!
  @actor ||= @options.fetch('actor', nil)
  @channel ||= @options.fetch('channel', nil)
  @shutting_down = false
  raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank?
  setup_celluloid_logger
  log_debug "#{@actor.class} starting on #{hostname}:#{port}"
  supervise_actors
end

Instance Attribute Details

#actorCelluloid::Actor

The actor that made the connection

Returns:

  • (Celluloid::Actor)

    actor to which callbacks will be delegated to



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/celluloid_pubsub/client.rb', line 19

class Client
  include CelluloidPubsub::BaseActor

  # The actor that made the connection
  # @return [Celluloid::Actor] actor to which callbacks will be delegated to
  attr_accessor :actor

  #  options that can be used to connect to webser and send additional data
  # @return [Hash] the options that can be used to connect to webser and send additional data
  attr_accessor :options

  # The channel to which the client will subscribe to once the connection is open
  # @return [String] The channel to which the client will subscribe to
  attr_accessor :channel

  finalizer :shutdown
  trap_exit :actor_died
  #  receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to
  #  when receiving messages from a channel
  #
  # @param  [Hash]  options the options that can be used to connect to webser and send additional data
  # @option options [String] :actor The actor that made the connection
  # @option options [String] :channel The channel to which the client will subscribe to once the connection is open
  # @option options [String] :log_file_path The path to the log file where debug messages will be printed, otherwise will use STDOUT
  # @option options [String]:hostname The hostname on which the webserver runs on
  # @option options [String] :port The port on which the webserver runs on
  # @option options [String] :path The request path that the webserver accepts
  #
  # @return [void]
  #
  # @api public
  def initialize(options)
    @options = options.stringify_keys!
    @actor ||= @options.fetch('actor', nil)
    @channel ||= @options.fetch('channel', nil)
    @shutting_down = false
    raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank?
    setup_celluloid_logger
    log_debug "#{@actor.class} starting on #{hostname}:#{port}"
    supervise_actors
  end

  # the method will return true if the actor is shutting down
  #
  #
  # @return [Boolean] returns true if the actor is shutting down
  #
  # @api public
  def shutting_down?
    @shutting_down == true
  end

  # the method will return the path to the log file where debug messages will be printed
  #
  # @return [String, nil] return the path to the log file where debug messages will be printed
  #
  # @api public
  def log_file_path
    @log_file_path ||= @options.fetch('log_file_path', nil)
  end

  # the method will return the log level of the logger
  #
  # @return [Integer, nil] return the log level used by the logger ( default is 1 - info)
  #
  # @api public
  def log_level
    @log_level ||= @options['log_level'] || ::Logger::Severity::INFO
  end

  # the method will link the current actor to the actor that is attached to, and the connection to the current actor
  #
  # @return [void]
  #
  # @api public
  def supervise_actors
    current_actor = Actor.current
    @actor.link current_actor if @actor.respond_to?(:link)
    current_actor.link connection
  end

  # the method will return the client that is used to
  #
  #
  # @return [Celluloid::WebSocket::Client] the websocket connection used to connect to server
  #
  # @api public
  def connection
    @connection ||= CelluloidPubsub::ClientConnection.new("ws://#{hostname}:#{port}#{path}", Actor.current)
  end

  # the method will return the hostname of the server
  #
  #
  # @return [String] the hostname where the server runs on
  #
  # @api public
  def hostname
    @hostname ||= @options.fetch('hostname', CelluloidPubsub::WebServer::HOST)
  end

  # the method will return the port on which the server accepts connections
  #
  #
  # @return [String] the port on which the server accepts connections
  #
  # @api public
  def port
    @port ||= @options.fetch('port', nil) || CelluloidPubsub::WebServer.find_unused_port
  end

  # the method will return the path of the URL on which the servers acccepts the connection
  #
  #
  # @return [String] the URL path that the server is mounted on
  #
  # @api public
  def path
    @path ||= @options.fetch('path', CelluloidPubsub::WebServer::PATH)
  end

  # the method will terminate the current actor
  #
  #
  # @return [void]
  #
  # @api public
  def shutdown
    @shutting_down = true
    log_debug "#{self.class} tries to 'shutdown'"
    terminate
  end

  #  checks if debug is enabled
  #
  #
  # @return [boolean]
  #
  # @api public
  def debug_enabled?
    @options.fetch('enable_debug', false).to_s == 'true'
  end

  # subscribes to a channel . need to be used inside the connect block passed to the actor
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def subscribe(channel, data = {})
    log_debug("#{@actor.class} tries to subscribe to channel  #{channel}")
    async.send_action('subscribe', channel, data)
  end

  # publishes to a channel some data (can be anything)
  #
  # @param [string] channel
  # @param [#to_s] data
  #
  # @return [void]
  #
  # @api public
  def publish(channel, data)
    send_action('publish', channel, data)
  end

  # unsubscribes current client from a channel
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def unsubscribe(channel)
    send_action('unsubscribe', channel)
  end

  # unsubscribes all clients subscribed to a channel
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def unsubscribe_clients(channel)
    send_action('unsubscribe_clients', channel)
  end

  # unsubscribes all clients from all channels
  #
  # @return [void]
  #
  # @api public
  def unsubscribe_all
    send_action('unsubscribe_all')
  end

  #  callback executes after connection is opened and delegates action to actor
  #
  # @return [void]
  #
  # @api public
  def on_open
    log_debug("#{@actor.class} websocket connection opened")
    async.subscribe(@channel) if @channel.present?
  end

  # callback executes when actor receives a message from a subscribed channel
  # and parses the message using JSON.parse and dispatches the parsed
  # message to the original actor that made the connection
  #
  # @param [JSON] data
  #
  # @return [void]
  #
  # @api public
  def on_message(data)
    message = JSON.parse(data)
    log_debug("#{@actor.class} received JSON  #{message}")
    if @actor.respond_to?(:async)
      @actor.async.on_message(message)
    else
      @actor.on_message(message)
    end
  end

  # callback executes when connection closes
  #
  # @param [String] code
  #
  # @param [String] reason
  #
  # @return [void]
  #
  # @api public
  def on_close(code, reason)
    log_debug("#{self.class} dispatching on close  #{code} #{reason}")
    if @actor.respond_to?(:async)
      @actor.async.on_close(code, reason)
    else
      @actor.on_close(code, reason)
    end
  ensure
    log_debug("#{self.class} closing the connection on close and terminating")
    connection.terminate unless actor_dead?(connection)
    terminate
  end

  private

  # method used to send an action to the webserver reactor , to a chanel and with data
  #
  # @param [String] action
  # @param [String] channel
  # @param [Hash] data
  #
  # @return [void]
  #
  # @api private
  def send_action(action, channel = nil, data = {})
    data = data.is_a?(Hash) ? data : {}
    publishing_data = { 'client_action' => action, 'channel' => channel, 'data' => data }.reject { |_key, value| value.blank? }
    async.chat(publishing_data)
  end

  # method used to send messages to the webserver
  # checks too see if the message is a hash and if it is it will transform it to JSON and send it to the webser
  # otherwise will construct a JSON object that will have the key action with the value 'message" and the key message witth the parameter's value
  #
  # @param [Hash] message
  #
  # @return [void]
  #
  # @api private
  def chat(message)
    final_message = message.is_a?(Hash) ? message.to_json : JSON.dump(action: 'message', message: message)
    log_debug("#{@actor.class} sends JSON #{final_message}")
    connection.text final_message
  end

  # method called when the actor is exiting
  #
  # @param [actor] actor - the current actor
  # @param [Hash] reason - the reason it crashed
  #
  # @return [void]
  #
  # @api public
  def actor_died(actor, reason)
    @shutting_down = true
    log_debug "Oh no! #{actor.inspect} has died because of a #{reason.class}"
  end
end

#channelString

The channel to which the client will subscribe to once the connection is open

Returns:

  • (String)

    The channel to which the client will subscribe to



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/celluloid_pubsub/client.rb', line 19

class Client
  include CelluloidPubsub::BaseActor

  # The actor that made the connection
  # @return [Celluloid::Actor] actor to which callbacks will be delegated to
  attr_accessor :actor

  #  options that can be used to connect to webser and send additional data
  # @return [Hash] the options that can be used to connect to webser and send additional data
  attr_accessor :options

  # The channel to which the client will subscribe to once the connection is open
  # @return [String] The channel to which the client will subscribe to
  attr_accessor :channel

  finalizer :shutdown
  trap_exit :actor_died
  #  receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to
  #  when receiving messages from a channel
  #
  # @param  [Hash]  options the options that can be used to connect to webser and send additional data
  # @option options [String] :actor The actor that made the connection
  # @option options [String] :channel The channel to which the client will subscribe to once the connection is open
  # @option options [String] :log_file_path The path to the log file where debug messages will be printed, otherwise will use STDOUT
  # @option options [String]:hostname The hostname on which the webserver runs on
  # @option options [String] :port The port on which the webserver runs on
  # @option options [String] :path The request path that the webserver accepts
  #
  # @return [void]
  #
  # @api public
  def initialize(options)
    @options = options.stringify_keys!
    @actor ||= @options.fetch('actor', nil)
    @channel ||= @options.fetch('channel', nil)
    @shutting_down = false
    raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank?
    setup_celluloid_logger
    log_debug "#{@actor.class} starting on #{hostname}:#{port}"
    supervise_actors
  end

  # the method will return true if the actor is shutting down
  #
  #
  # @return [Boolean] returns true if the actor is shutting down
  #
  # @api public
  def shutting_down?
    @shutting_down == true
  end

  # the method will return the path to the log file where debug messages will be printed
  #
  # @return [String, nil] return the path to the log file where debug messages will be printed
  #
  # @api public
  def log_file_path
    @log_file_path ||= @options.fetch('log_file_path', nil)
  end

  # the method will return the log level of the logger
  #
  # @return [Integer, nil] return the log level used by the logger ( default is 1 - info)
  #
  # @api public
  def log_level
    @log_level ||= @options['log_level'] || ::Logger::Severity::INFO
  end

  # the method will link the current actor to the actor that is attached to, and the connection to the current actor
  #
  # @return [void]
  #
  # @api public
  def supervise_actors
    current_actor = Actor.current
    @actor.link current_actor if @actor.respond_to?(:link)
    current_actor.link connection
  end

  # the method will return the client that is used to
  #
  #
  # @return [Celluloid::WebSocket::Client] the websocket connection used to connect to server
  #
  # @api public
  def connection
    @connection ||= CelluloidPubsub::ClientConnection.new("ws://#{hostname}:#{port}#{path}", Actor.current)
  end

  # the method will return the hostname of the server
  #
  #
  # @return [String] the hostname where the server runs on
  #
  # @api public
  def hostname
    @hostname ||= @options.fetch('hostname', CelluloidPubsub::WebServer::HOST)
  end

  # the method will return the port on which the server accepts connections
  #
  #
  # @return [String] the port on which the server accepts connections
  #
  # @api public
  def port
    @port ||= @options.fetch('port', nil) || CelluloidPubsub::WebServer.find_unused_port
  end

  # the method will return the path of the URL on which the servers acccepts the connection
  #
  #
  # @return [String] the URL path that the server is mounted on
  #
  # @api public
  def path
    @path ||= @options.fetch('path', CelluloidPubsub::WebServer::PATH)
  end

  # the method will terminate the current actor
  #
  #
  # @return [void]
  #
  # @api public
  def shutdown
    @shutting_down = true
    log_debug "#{self.class} tries to 'shutdown'"
    terminate
  end

  #  checks if debug is enabled
  #
  #
  # @return [boolean]
  #
  # @api public
  def debug_enabled?
    @options.fetch('enable_debug', false).to_s == 'true'
  end

  # subscribes to a channel . need to be used inside the connect block passed to the actor
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def subscribe(channel, data = {})
    log_debug("#{@actor.class} tries to subscribe to channel  #{channel}")
    async.send_action('subscribe', channel, data)
  end

  # publishes to a channel some data (can be anything)
  #
  # @param [string] channel
  # @param [#to_s] data
  #
  # @return [void]
  #
  # @api public
  def publish(channel, data)
    send_action('publish', channel, data)
  end

  # unsubscribes current client from a channel
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def unsubscribe(channel)
    send_action('unsubscribe', channel)
  end

  # unsubscribes all clients subscribed to a channel
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def unsubscribe_clients(channel)
    send_action('unsubscribe_clients', channel)
  end

  # unsubscribes all clients from all channels
  #
  # @return [void]
  #
  # @api public
  def unsubscribe_all
    send_action('unsubscribe_all')
  end

  #  callback executes after connection is opened and delegates action to actor
  #
  # @return [void]
  #
  # @api public
  def on_open
    log_debug("#{@actor.class} websocket connection opened")
    async.subscribe(@channel) if @channel.present?
  end

  # callback executes when actor receives a message from a subscribed channel
  # and parses the message using JSON.parse and dispatches the parsed
  # message to the original actor that made the connection
  #
  # @param [JSON] data
  #
  # @return [void]
  #
  # @api public
  def on_message(data)
    message = JSON.parse(data)
    log_debug("#{@actor.class} received JSON  #{message}")
    if @actor.respond_to?(:async)
      @actor.async.on_message(message)
    else
      @actor.on_message(message)
    end
  end

  # callback executes when connection closes
  #
  # @param [String] code
  #
  # @param [String] reason
  #
  # @return [void]
  #
  # @api public
  def on_close(code, reason)
    log_debug("#{self.class} dispatching on close  #{code} #{reason}")
    if @actor.respond_to?(:async)
      @actor.async.on_close(code, reason)
    else
      @actor.on_close(code, reason)
    end
  ensure
    log_debug("#{self.class} closing the connection on close and terminating")
    connection.terminate unless actor_dead?(connection)
    terminate
  end

  private

  # method used to send an action to the webserver reactor , to a chanel and with data
  #
  # @param [String] action
  # @param [String] channel
  # @param [Hash] data
  #
  # @return [void]
  #
  # @api private
  def send_action(action, channel = nil, data = {})
    data = data.is_a?(Hash) ? data : {}
    publishing_data = { 'client_action' => action, 'channel' => channel, 'data' => data }.reject { |_key, value| value.blank? }
    async.chat(publishing_data)
  end

  # method used to send messages to the webserver
  # checks too see if the message is a hash and if it is it will transform it to JSON and send it to the webser
  # otherwise will construct a JSON object that will have the key action with the value 'message" and the key message witth the parameter's value
  #
  # @param [Hash] message
  #
  # @return [void]
  #
  # @api private
  def chat(message)
    final_message = message.is_a?(Hash) ? message.to_json : JSON.dump(action: 'message', message: message)
    log_debug("#{@actor.class} sends JSON #{final_message}")
    connection.text final_message
  end

  # method called when the actor is exiting
  #
  # @param [actor] actor - the current actor
  # @param [Hash] reason - the reason it crashed
  #
  # @return [void]
  #
  # @api public
  def actor_died(actor, reason)
    @shutting_down = true
    log_debug "Oh no! #{actor.inspect} has died because of a #{reason.class}"
  end
end

#optionsHash

options that can be used to connect to webser and send additional data

Returns:

  • (Hash)

    the options that can be used to connect to webser and send additional data



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/celluloid_pubsub/client.rb', line 19

class Client
  include CelluloidPubsub::BaseActor

  # The actor that made the connection
  # @return [Celluloid::Actor] actor to which callbacks will be delegated to
  attr_accessor :actor

  #  options that can be used to connect to webser and send additional data
  # @return [Hash] the options that can be used to connect to webser and send additional data
  attr_accessor :options

  # The channel to which the client will subscribe to once the connection is open
  # @return [String] The channel to which the client will subscribe to
  attr_accessor :channel

  finalizer :shutdown
  trap_exit :actor_died
  #  receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to
  #  when receiving messages from a channel
  #
  # @param  [Hash]  options the options that can be used to connect to webser and send additional data
  # @option options [String] :actor The actor that made the connection
  # @option options [String] :channel The channel to which the client will subscribe to once the connection is open
  # @option options [String] :log_file_path The path to the log file where debug messages will be printed, otherwise will use STDOUT
  # @option options [String]:hostname The hostname on which the webserver runs on
  # @option options [String] :port The port on which the webserver runs on
  # @option options [String] :path The request path that the webserver accepts
  #
  # @return [void]
  #
  # @api public
  def initialize(options)
    @options = options.stringify_keys!
    @actor ||= @options.fetch('actor', nil)
    @channel ||= @options.fetch('channel', nil)
    @shutting_down = false
    raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank?
    setup_celluloid_logger
    log_debug "#{@actor.class} starting on #{hostname}:#{port}"
    supervise_actors
  end

  # the method will return true if the actor is shutting down
  #
  #
  # @return [Boolean] returns true if the actor is shutting down
  #
  # @api public
  def shutting_down?
    @shutting_down == true
  end

  # the method will return the path to the log file where debug messages will be printed
  #
  # @return [String, nil] return the path to the log file where debug messages will be printed
  #
  # @api public
  def log_file_path
    @log_file_path ||= @options.fetch('log_file_path', nil)
  end

  # the method will return the log level of the logger
  #
  # @return [Integer, nil] return the log level used by the logger ( default is 1 - info)
  #
  # @api public
  def log_level
    @log_level ||= @options['log_level'] || ::Logger::Severity::INFO
  end

  # the method will link the current actor to the actor that is attached to, and the connection to the current actor
  #
  # @return [void]
  #
  # @api public
  def supervise_actors
    current_actor = Actor.current
    @actor.link current_actor if @actor.respond_to?(:link)
    current_actor.link connection
  end

  # the method will return the client that is used to
  #
  #
  # @return [Celluloid::WebSocket::Client] the websocket connection used to connect to server
  #
  # @api public
  def connection
    @connection ||= CelluloidPubsub::ClientConnection.new("ws://#{hostname}:#{port}#{path}", Actor.current)
  end

  # the method will return the hostname of the server
  #
  #
  # @return [String] the hostname where the server runs on
  #
  # @api public
  def hostname
    @hostname ||= @options.fetch('hostname', CelluloidPubsub::WebServer::HOST)
  end

  # the method will return the port on which the server accepts connections
  #
  #
  # @return [String] the port on which the server accepts connections
  #
  # @api public
  def port
    @port ||= @options.fetch('port', nil) || CelluloidPubsub::WebServer.find_unused_port
  end

  # the method will return the path of the URL on which the servers acccepts the connection
  #
  #
  # @return [String] the URL path that the server is mounted on
  #
  # @api public
  def path
    @path ||= @options.fetch('path', CelluloidPubsub::WebServer::PATH)
  end

  # the method will terminate the current actor
  #
  #
  # @return [void]
  #
  # @api public
  def shutdown
    @shutting_down = true
    log_debug "#{self.class} tries to 'shutdown'"
    terminate
  end

  #  checks if debug is enabled
  #
  #
  # @return [boolean]
  #
  # @api public
  def debug_enabled?
    @options.fetch('enable_debug', false).to_s == 'true'
  end

  # subscribes to a channel . need to be used inside the connect block passed to the actor
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def subscribe(channel, data = {})
    log_debug("#{@actor.class} tries to subscribe to channel  #{channel}")
    async.send_action('subscribe', channel, data)
  end

  # publishes to a channel some data (can be anything)
  #
  # @param [string] channel
  # @param [#to_s] data
  #
  # @return [void]
  #
  # @api public
  def publish(channel, data)
    send_action('publish', channel, data)
  end

  # unsubscribes current client from a channel
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def unsubscribe(channel)
    send_action('unsubscribe', channel)
  end

  # unsubscribes all clients subscribed to a channel
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def unsubscribe_clients(channel)
    send_action('unsubscribe_clients', channel)
  end

  # unsubscribes all clients from all channels
  #
  # @return [void]
  #
  # @api public
  def unsubscribe_all
    send_action('unsubscribe_all')
  end

  #  callback executes after connection is opened and delegates action to actor
  #
  # @return [void]
  #
  # @api public
  def on_open
    log_debug("#{@actor.class} websocket connection opened")
    async.subscribe(@channel) if @channel.present?
  end

  # callback executes when actor receives a message from a subscribed channel
  # and parses the message using JSON.parse and dispatches the parsed
  # message to the original actor that made the connection
  #
  # @param [JSON] data
  #
  # @return [void]
  #
  # @api public
  def on_message(data)
    message = JSON.parse(data)
    log_debug("#{@actor.class} received JSON  #{message}")
    if @actor.respond_to?(:async)
      @actor.async.on_message(message)
    else
      @actor.on_message(message)
    end
  end

  # callback executes when connection closes
  #
  # @param [String] code
  #
  # @param [String] reason
  #
  # @return [void]
  #
  # @api public
  def on_close(code, reason)
    log_debug("#{self.class} dispatching on close  #{code} #{reason}")
    if @actor.respond_to?(:async)
      @actor.async.on_close(code, reason)
    else
      @actor.on_close(code, reason)
    end
  ensure
    log_debug("#{self.class} closing the connection on close and terminating")
    connection.terminate unless actor_dead?(connection)
    terminate
  end

  private

  # method used to send an action to the webserver reactor , to a chanel and with data
  #
  # @param [String] action
  # @param [String] channel
  # @param [Hash] data
  #
  # @return [void]
  #
  # @api private
  def send_action(action, channel = nil, data = {})
    data = data.is_a?(Hash) ? data : {}
    publishing_data = { 'client_action' => action, 'channel' => channel, 'data' => data }.reject { |_key, value| value.blank? }
    async.chat(publishing_data)
  end

  # method used to send messages to the webserver
  # checks too see if the message is a hash and if it is it will transform it to JSON and send it to the webser
  # otherwise will construct a JSON object that will have the key action with the value 'message" and the key message witth the parameter's value
  #
  # @param [Hash] message
  #
  # @return [void]
  #
  # @api private
  def chat(message)
    final_message = message.is_a?(Hash) ? message.to_json : JSON.dump(action: 'message', message: message)
    log_debug("#{@actor.class} sends JSON #{final_message}")
    connection.text final_message
  end

  # method called when the actor is exiting
  #
  # @param [actor] actor - the current actor
  # @param [Hash] reason - the reason it crashed
  #
  # @return [void]
  #
  # @api public
  def actor_died(actor, reason)
    @shutting_down = true
    log_debug "Oh no! #{actor.inspect} has died because of a #{reason.class}"
  end
end

Instance Method Details

#connectionCelluloid::WebSocket::Client

the method will return the client that is used to

Returns:

  • (Celluloid::WebSocket::Client)

    the websocket connection used to connect to server



106
107
108
# File 'lib/celluloid_pubsub/client.rb', line 106

def connection
  @connection ||= CelluloidPubsub::ClientConnection.new("ws://#{hostname}:#{port}#{path}", Actor.current)
end

#debug_enabled?boolean

checks if debug is enabled

Returns:

  • (boolean)


158
159
160
# File 'lib/celluloid_pubsub/client.rb', line 158

def debug_enabled?
  @options.fetch('enable_debug', false).to_s == 'true'
end

#hostnameString

the method will return the hostname of the server

Returns:

  • (String)

    the hostname where the server runs on



116
117
118
# File 'lib/celluloid_pubsub/client.rb', line 116

def hostname
  @hostname ||= @options.fetch('hostname', CelluloidPubsub::WebServer::HOST)
end

#log_file_pathString?

the method will return the path to the log file where debug messages will be printed

Returns:

  • (String, nil)

    return the path to the log file where debug messages will be printed



76
77
78
# File 'lib/celluloid_pubsub/client.rb', line 76

def log_file_path
  @log_file_path ||= @options.fetch('log_file_path', nil)
end

#log_levelInteger?

the method will return the log level of the logger

Returns:

  • (Integer, nil)

    return the log level used by the logger ( default is 1 - info)



85
86
87
# File 'lib/celluloid_pubsub/client.rb', line 85

def log_level
  @log_level ||= @options['log_level'] || ::Logger::Severity::INFO
end

#on_close(code, reason) ⇒ void

This method returns an undefined value.

callback executes when connection closes

Parameters:

  • code (String)
  • reason (String)


255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/celluloid_pubsub/client.rb', line 255

def on_close(code, reason)
  log_debug("#{self.class} dispatching on close  #{code} #{reason}")
  if @actor.respond_to?(:async)
    @actor.async.on_close(code, reason)
  else
    @actor.on_close(code, reason)
  end
ensure
  log_debug("#{self.class} closing the connection on close and terminating")
  connection.terminate unless actor_dead?(connection)
  terminate
end

#on_message(data) ⇒ void

This method returns an undefined value.

callback executes when actor receives a message from a subscribed channel and parses the message using JSON.parse and dispatches the parsed message to the original actor that made the connection

Parameters:

  • data (JSON)


236
237
238
239
240
241
242
243
244
# File 'lib/celluloid_pubsub/client.rb', line 236

def on_message(data)
  message = JSON.parse(data)
  log_debug("#{@actor.class} received JSON  #{message}")
  if @actor.respond_to?(:async)
    @actor.async.on_message(message)
  else
    @actor.on_message(message)
  end
end

#on_openvoid

This method returns an undefined value.

callback executes after connection is opened and delegates action to actor



222
223
224
225
# File 'lib/celluloid_pubsub/client.rb', line 222

def on_open
  log_debug("#{@actor.class} websocket connection opened")
  async.subscribe(@channel) if @channel.present?
end

#pathString

the method will return the path of the URL on which the servers acccepts the connection

Returns:

  • (String)

    the URL path that the server is mounted on



136
137
138
# File 'lib/celluloid_pubsub/client.rb', line 136

def path
  @path ||= @options.fetch('path', CelluloidPubsub::WebServer::PATH)
end

#portString

the method will return the port on which the server accepts connections

Returns:

  • (String)

    the port on which the server accepts connections



126
127
128
# File 'lib/celluloid_pubsub/client.rb', line 126

def port
  @port ||= @options.fetch('port', nil) || CelluloidPubsub::WebServer.find_unused_port
end

#publish(channel, data) ⇒ void

This method returns an undefined value.

publishes to a channel some data (can be anything)

Parameters:

  • channel (string)
  • data (#to_s)


182
183
184
# File 'lib/celluloid_pubsub/client.rb', line 182

def publish(channel, data)
  send_action('publish', channel, data)
end

#shutdownvoid

This method returns an undefined value.

the method will terminate the current actor



146
147
148
149
150
# File 'lib/celluloid_pubsub/client.rb', line 146

def shutdown
  @shutting_down = true
  log_debug "#{self.class} tries to 'shutdown'"
  terminate
end

#shutting_down?Boolean

the method will return true if the actor is shutting down

Returns:

  • (Boolean)

    returns true if the actor is shutting down



67
68
69
# File 'lib/celluloid_pubsub/client.rb', line 67

def shutting_down?
  @shutting_down == true
end

#subscribe(channel, data = {}) ⇒ void

This method returns an undefined value.

subscribes to a channel . need to be used inside the connect block passed to the actor

Parameters:

  • channel (string)


169
170
171
172
# File 'lib/celluloid_pubsub/client.rb', line 169

def subscribe(channel, data = {})
  log_debug("#{@actor.class} tries to subscribe to channel  #{channel}")
  async.send_action('subscribe', channel, data)
end

#supervise_actorsvoid

This method returns an undefined value.

the method will link the current actor to the actor that is attached to, and the connection to the current actor



94
95
96
97
98
# File 'lib/celluloid_pubsub/client.rb', line 94

def supervise_actors
  current_actor = Actor.current
  @actor.link current_actor if @actor.respond_to?(:link)
  current_actor.link connection
end

#unsubscribe(channel) ⇒ void

This method returns an undefined value.

unsubscribes current client from a channel

Parameters:

  • channel (string)


193
194
195
# File 'lib/celluloid_pubsub/client.rb', line 193

def unsubscribe(channel)
  send_action('unsubscribe', channel)
end

#unsubscribe_allvoid

This method returns an undefined value.

unsubscribes all clients from all channels



213
214
215
# File 'lib/celluloid_pubsub/client.rb', line 213

def unsubscribe_all
  send_action('unsubscribe_all')
end

#unsubscribe_clients(channel) ⇒ void

This method returns an undefined value.

unsubscribes all clients subscribed to a channel

Parameters:

  • channel (string)


204
205
206
# File 'lib/celluloid_pubsub/client.rb', line 204

def unsubscribe_clients(channel)
  send_action('unsubscribe_clients', channel)
end