Class: CelluloidPubsub::Client

Inherits:
Object
  • Object
show all
Includes:
BaseActor
Defined in:
lib/celluloid_pubsub/client.rb

Overview

worker that subscribes to a channel or publishes to a channel if it used to subscribe to a channel the worker will dispatch the messages to the actor that made the connection in the first place.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from BaseActor

celluloid_logger_class, celluloid_version, included, setup_actor_supervision, version_less_than_seventeen?

Methods included from Helper

action_subscribe?, fetch_gem_version, filtered_error?, find_loaded_gem, find_loaded_gem_property, get_parsed_version, log_debug, parse_options, setup_celluloid_exception_handler, setup_celluloid_logger, setup_log_file, #succesfull_subscription?, verify_gem_version

Constructor Details

#initialize(options) ⇒ void

receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to

when receiving messages from a channel

Parameters:

  • options (Hash)

    the options that can be used to connect to webser and send additional data

Options Hash (options):

  • :actor (String)

    The actor that made the connection

  • :channel (String)

    The channel to which the client will subscribe to once the connection is open

  • :log_file_path (String)

    The path to the log file where debug messages will be printed, otherwise will use STDOUT

  • :hostname (String)

    The hostname on which the webserver runs on

  • :port (String)

    The port on which the webserver runs on

  • :path (String)

    The request path that the webserver accepts



35
36
37
38
39
40
41
42
# File 'lib/celluloid_pubsub/client.rb', line 35

def initialize(options)
  @options = options.stringify_keys!
  @actor ||= @options.fetch('actor', nil)
  @channel ||= @options.fetch('channel', nil)
  raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank?
  supervise_actors
  setup_celluloid_logger
end

Instance Attribute Details

#actorCelluloid::Actor

Returns actor to which callbacks will be delegated to.

Returns:

  • (Celluloid::Actor)

    actor to which callbacks will be delegated to



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/celluloid_pubsub/client.rb', line 15

class Client
  include CelluloidPubsub::BaseActor


  attr_accessor :actor, :options, :channel
  finalizer :shutdown
  #  receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to
  #  when receiving messages from a channel
  #
  # @param  [Hash]  options the options that can be used to connect to webser and send additional data
  # @option options [String] :actor The actor that made the connection
  # @option options [String] :channel The channel to which the client will subscribe to once the connection is open
  # @option options [String] :log_file_path The path to the log file where debug messages will be printed, otherwise will use STDOUT
  # @option options [String]:hostname The hostname on which the webserver runs on
  # @option options [String] :port The port on which the webserver runs on
  # @option options [String] :path The request path that the webserver accepts
  #
  # @return [void]
  #
  # @api public
  def initialize(options)
    @options = options.stringify_keys!
    @actor ||= @options.fetch('actor', nil)
    @channel ||= @options.fetch('channel', nil)
    raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank?
    supervise_actors
    setup_celluloid_logger
  end

  # the method will return the path to the log file where debug messages will be printed
  #
  # @return [String, nil] return the path to the log file where debug messages will be printed
  #
  # @api public
  def log_file_path
    @log_file_path ||= @options.fetch('log_file_path', nil)
  end

  # the method will link the current actor to the actor that is attached to, and the connection to the current actor
  #
  # @return [void]
  #
  # @api public
  def supervise_actors
    current_actor = Actor.current
    @actor.link current_actor if @actor.respond_to?(:link)
    current_actor.link connection
  end

  # the method will return the client that is used to
  #
  #
  # @return [Celluloid::WebSocket::Client] the websocket connection used to connect to server
  #
  # @api public
  def connection
    @connection ||= Celluloid::WebSocket::Client.new("ws://#{hostname}:#{port}#{path}", Actor.current)
  end

  # the method will return the hostname of the server
  #
  #
  # @return [String] the hostname where the server runs on
  #
  # @api public
  def hostname
    @hostname ||= @options.fetch('hostname', CelluloidPubsub::WebServer::HOST)
  end

  # the method will return the port on which the server accepts connections
  #
  #
  # @return [String] the port on which the server accepts connections
  #
  # @api public
  def port
    @port ||= @options.fetch('port', CelluloidPubsub::WebServer::PORT)
  end

  # the method will return the path of the URL on which the servers acccepts the connection
  #
  #
  # @return [String] the URL path that the server is mounted on
  #
  # @api public
  def path
    @path ||= @options.fetch('path', CelluloidPubsub::WebServer::PATH)
  end

  # the method will terminate the current actor
  #
  #
  # @return [void]
  #
  # @api public
  def shutdown
    log_debug "#{self.class} tries to 'shudown'"
    terminate
  end

  #  checks if debug is enabled
  #
  #
  # @return [boolean]
  #
  # @api public
  def debug_enabled?
    @options.fetch('enable_debug', false).to_s == 'true'
  end

  # subscribes to a channel . need to be used inside the connect block passed to the actor
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def subscribe(channel, data = {})
    log_debug("#{@actor.class} tries to subscribe to channel  #{channel}")
    async.send_action('subscribe', channel, data)
  end

  # publishes to a channel some data (can be anything)
  #
  # @param [string] channel
  # @param [#to_s] data
  #
  # @return [void]
  #
  # @api public
  def publish(channel, data)
    send_action('publish', channel, data)
  end

  # unsubscribes current client from a channel
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def unsubscribe(channel)
    send_action('unsubscribe', channel)
  end

  # unsubscribes all clients subscribed to a channel
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def unsubscribe_clients(channel)
    send_action('unsubscribe_clients', channel)
  end

  # unsubscribes all clients from all channels
  #
  # @return [void]
  #
  # @api public
  def unsubscribe_all
    send_action('unsubscribe_all')
  end

  #  callback executes after connection is opened and delegates action to actor
  #
  # @return [void]
  #
  # @api public
  def on_open
    log_debug("#{@actor.class} websocket connection opened")
    async.subscribe(@channel) if @channel.present?
  end

  # callback executes when actor receives a message from a subscribed channel
  # and parses the message using JSON.parse and dispatches the parsed
  # message to the original actor that made the connection
  #
  # @param [JSON] data
  #
  # @return [void]
  #
  # @api public
  def on_message(data)
    message = JSON.parse(data)
    log_debug("#{@actor.class} received JSON  #{message}")
    if @actor.respond_to?(:async)
      @actor.async.on_message(message)
    else
      @actor.on_message(message)
    end
  end

  # callback executes when connection closes
  #
  # @param [String] code
  #
  # @param [String] reason
  #
  # @return [void]
  #
  # @api public
  def on_close(code, reason)
    connection.terminate
    terminate
    log_debug("#{@actor.class} dispatching on close  #{code} #{reason}")
    if @actor.respond_to?(:async)
      @actor.async.on_close(code, reason)
    else
      @actor.on_close(code, reason)
    end
  end

private

  # method used to send an action to the webserver reactor , to a chanel and with data
  #
  # @param [String] action
  # @param [String] channel
  # @param [Hash] data
  #
  # @return [void]
  #
  # @api private
  def send_action(action, channel = nil, data = {})
    data = data.is_a?(Hash) ? data : {}
    publishing_data = { 'client_action' => action, 'channel' => channel, 'data' => data }.reject { |_key, value| value.blank? }
    async.chat(publishing_data)
  end

  # method used to send messages to the webserver
  # checks too see if the message is a hash and if it is it will transform it to JSON and send it to the webser
  # otherwise will construct a JSON object that will have the key action with the value 'message" and the key message witth the parameter's value
  #
  # @param [Hash] message
  #
  # @return [void]
  #
  # @api private
  def chat(message)
    final_message = nil
    if message.is_a?(Hash)
      final_message = message.to_json
    else
      final_message = JSON.dump(action: 'message', message: message)
    end
    log_debug("#{@actor.class} sends JSON #{final_message}")
    connection.text final_message
  end
end

#channelString

Returns The channel to which the client will subscribe to.

Returns:

  • (String)

    The channel to which the client will subscribe to



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/celluloid_pubsub/client.rb', line 15

class Client
  include CelluloidPubsub::BaseActor


  attr_accessor :actor, :options, :channel
  finalizer :shutdown
  #  receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to
  #  when receiving messages from a channel
  #
  # @param  [Hash]  options the options that can be used to connect to webser and send additional data
  # @option options [String] :actor The actor that made the connection
  # @option options [String] :channel The channel to which the client will subscribe to once the connection is open
  # @option options [String] :log_file_path The path to the log file where debug messages will be printed, otherwise will use STDOUT
  # @option options [String]:hostname The hostname on which the webserver runs on
  # @option options [String] :port The port on which the webserver runs on
  # @option options [String] :path The request path that the webserver accepts
  #
  # @return [void]
  #
  # @api public
  def initialize(options)
    @options = options.stringify_keys!
    @actor ||= @options.fetch('actor', nil)
    @channel ||= @options.fetch('channel', nil)
    raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank?
    supervise_actors
    setup_celluloid_logger
  end

  # the method will return the path to the log file where debug messages will be printed
  #
  # @return [String, nil] return the path to the log file where debug messages will be printed
  #
  # @api public
  def log_file_path
    @log_file_path ||= @options.fetch('log_file_path', nil)
  end

  # the method will link the current actor to the actor that is attached to, and the connection to the current actor
  #
  # @return [void]
  #
  # @api public
  def supervise_actors
    current_actor = Actor.current
    @actor.link current_actor if @actor.respond_to?(:link)
    current_actor.link connection
  end

  # the method will return the client that is used to
  #
  #
  # @return [Celluloid::WebSocket::Client] the websocket connection used to connect to server
  #
  # @api public
  def connection
    @connection ||= Celluloid::WebSocket::Client.new("ws://#{hostname}:#{port}#{path}", Actor.current)
  end

  # the method will return the hostname of the server
  #
  #
  # @return [String] the hostname where the server runs on
  #
  # @api public
  def hostname
    @hostname ||= @options.fetch('hostname', CelluloidPubsub::WebServer::HOST)
  end

  # the method will return the port on which the server accepts connections
  #
  #
  # @return [String] the port on which the server accepts connections
  #
  # @api public
  def port
    @port ||= @options.fetch('port', CelluloidPubsub::WebServer::PORT)
  end

  # the method will return the path of the URL on which the servers acccepts the connection
  #
  #
  # @return [String] the URL path that the server is mounted on
  #
  # @api public
  def path
    @path ||= @options.fetch('path', CelluloidPubsub::WebServer::PATH)
  end

  # the method will terminate the current actor
  #
  #
  # @return [void]
  #
  # @api public
  def shutdown
    log_debug "#{self.class} tries to 'shudown'"
    terminate
  end

  #  checks if debug is enabled
  #
  #
  # @return [boolean]
  #
  # @api public
  def debug_enabled?
    @options.fetch('enable_debug', false).to_s == 'true'
  end

  # subscribes to a channel . need to be used inside the connect block passed to the actor
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def subscribe(channel, data = {})
    log_debug("#{@actor.class} tries to subscribe to channel  #{channel}")
    async.send_action('subscribe', channel, data)
  end

  # publishes to a channel some data (can be anything)
  #
  # @param [string] channel
  # @param [#to_s] data
  #
  # @return [void]
  #
  # @api public
  def publish(channel, data)
    send_action('publish', channel, data)
  end

  # unsubscribes current client from a channel
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def unsubscribe(channel)
    send_action('unsubscribe', channel)
  end

  # unsubscribes all clients subscribed to a channel
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def unsubscribe_clients(channel)
    send_action('unsubscribe_clients', channel)
  end

  # unsubscribes all clients from all channels
  #
  # @return [void]
  #
  # @api public
  def unsubscribe_all
    send_action('unsubscribe_all')
  end

  #  callback executes after connection is opened and delegates action to actor
  #
  # @return [void]
  #
  # @api public
  def on_open
    log_debug("#{@actor.class} websocket connection opened")
    async.subscribe(@channel) if @channel.present?
  end

  # callback executes when actor receives a message from a subscribed channel
  # and parses the message using JSON.parse and dispatches the parsed
  # message to the original actor that made the connection
  #
  # @param [JSON] data
  #
  # @return [void]
  #
  # @api public
  def on_message(data)
    message = JSON.parse(data)
    log_debug("#{@actor.class} received JSON  #{message}")
    if @actor.respond_to?(:async)
      @actor.async.on_message(message)
    else
      @actor.on_message(message)
    end
  end

  # callback executes when connection closes
  #
  # @param [String] code
  #
  # @param [String] reason
  #
  # @return [void]
  #
  # @api public
  def on_close(code, reason)
    connection.terminate
    terminate
    log_debug("#{@actor.class} dispatching on close  #{code} #{reason}")
    if @actor.respond_to?(:async)
      @actor.async.on_close(code, reason)
    else
      @actor.on_close(code, reason)
    end
  end

private

  # method used to send an action to the webserver reactor , to a chanel and with data
  #
  # @param [String] action
  # @param [String] channel
  # @param [Hash] data
  #
  # @return [void]
  #
  # @api private
  def send_action(action, channel = nil, data = {})
    data = data.is_a?(Hash) ? data : {}
    publishing_data = { 'client_action' => action, 'channel' => channel, 'data' => data }.reject { |_key, value| value.blank? }
    async.chat(publishing_data)
  end

  # method used to send messages to the webserver
  # checks too see if the message is a hash and if it is it will transform it to JSON and send it to the webser
  # otherwise will construct a JSON object that will have the key action with the value 'message" and the key message witth the parameter's value
  #
  # @param [Hash] message
  #
  # @return [void]
  #
  # @api private
  def chat(message)
    final_message = nil
    if message.is_a?(Hash)
      final_message = message.to_json
    else
      final_message = JSON.dump(action: 'message', message: message)
    end
    log_debug("#{@actor.class} sends JSON #{final_message}")
    connection.text final_message
  end
end

#optionsHash

Returns the options that can be used to connect to webser and send additional data.

Returns:

  • (Hash)

    the options that can be used to connect to webser and send additional data



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/celluloid_pubsub/client.rb', line 15

class Client
  include CelluloidPubsub::BaseActor


  attr_accessor :actor, :options, :channel
  finalizer :shutdown
  #  receives a list of options that are used to connect to the webserver and an actor to which the callbacks are delegated to
  #  when receiving messages from a channel
  #
  # @param  [Hash]  options the options that can be used to connect to webser and send additional data
  # @option options [String] :actor The actor that made the connection
  # @option options [String] :channel The channel to which the client will subscribe to once the connection is open
  # @option options [String] :log_file_path The path to the log file where debug messages will be printed, otherwise will use STDOUT
  # @option options [String]:hostname The hostname on which the webserver runs on
  # @option options [String] :port The port on which the webserver runs on
  # @option options [String] :path The request path that the webserver accepts
  #
  # @return [void]
  #
  # @api public
  def initialize(options)
    @options = options.stringify_keys!
    @actor ||= @options.fetch('actor', nil)
    @channel ||= @options.fetch('channel', nil)
    raise "#{self}: Please provide an actor in the options list!!!" if @actor.blank?
    supervise_actors
    setup_celluloid_logger
  end

  # the method will return the path to the log file where debug messages will be printed
  #
  # @return [String, nil] return the path to the log file where debug messages will be printed
  #
  # @api public
  def log_file_path
    @log_file_path ||= @options.fetch('log_file_path', nil)
  end

  # the method will link the current actor to the actor that is attached to, and the connection to the current actor
  #
  # @return [void]
  #
  # @api public
  def supervise_actors
    current_actor = Actor.current
    @actor.link current_actor if @actor.respond_to?(:link)
    current_actor.link connection
  end

  # the method will return the client that is used to
  #
  #
  # @return [Celluloid::WebSocket::Client] the websocket connection used to connect to server
  #
  # @api public
  def connection
    @connection ||= Celluloid::WebSocket::Client.new("ws://#{hostname}:#{port}#{path}", Actor.current)
  end

  # the method will return the hostname of the server
  #
  #
  # @return [String] the hostname where the server runs on
  #
  # @api public
  def hostname
    @hostname ||= @options.fetch('hostname', CelluloidPubsub::WebServer::HOST)
  end

  # the method will return the port on which the server accepts connections
  #
  #
  # @return [String] the port on which the server accepts connections
  #
  # @api public
  def port
    @port ||= @options.fetch('port', CelluloidPubsub::WebServer::PORT)
  end

  # the method will return the path of the URL on which the servers acccepts the connection
  #
  #
  # @return [String] the URL path that the server is mounted on
  #
  # @api public
  def path
    @path ||= @options.fetch('path', CelluloidPubsub::WebServer::PATH)
  end

  # the method will terminate the current actor
  #
  #
  # @return [void]
  #
  # @api public
  def shutdown
    log_debug "#{self.class} tries to 'shudown'"
    terminate
  end

  #  checks if debug is enabled
  #
  #
  # @return [boolean]
  #
  # @api public
  def debug_enabled?
    @options.fetch('enable_debug', false).to_s == 'true'
  end

  # subscribes to a channel . need to be used inside the connect block passed to the actor
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def subscribe(channel, data = {})
    log_debug("#{@actor.class} tries to subscribe to channel  #{channel}")
    async.send_action('subscribe', channel, data)
  end

  # publishes to a channel some data (can be anything)
  #
  # @param [string] channel
  # @param [#to_s] data
  #
  # @return [void]
  #
  # @api public
  def publish(channel, data)
    send_action('publish', channel, data)
  end

  # unsubscribes current client from a channel
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def unsubscribe(channel)
    send_action('unsubscribe', channel)
  end

  # unsubscribes all clients subscribed to a channel
  #
  # @param [string] channel
  #
  # @return [void]
  #
  # @api public
  def unsubscribe_clients(channel)
    send_action('unsubscribe_clients', channel)
  end

  # unsubscribes all clients from all channels
  #
  # @return [void]
  #
  # @api public
  def unsubscribe_all
    send_action('unsubscribe_all')
  end

  #  callback executes after connection is opened and delegates action to actor
  #
  # @return [void]
  #
  # @api public
  def on_open
    log_debug("#{@actor.class} websocket connection opened")
    async.subscribe(@channel) if @channel.present?
  end

  # callback executes when actor receives a message from a subscribed channel
  # and parses the message using JSON.parse and dispatches the parsed
  # message to the original actor that made the connection
  #
  # @param [JSON] data
  #
  # @return [void]
  #
  # @api public
  def on_message(data)
    message = JSON.parse(data)
    log_debug("#{@actor.class} received JSON  #{message}")
    if @actor.respond_to?(:async)
      @actor.async.on_message(message)
    else
      @actor.on_message(message)
    end
  end

  # callback executes when connection closes
  #
  # @param [String] code
  #
  # @param [String] reason
  #
  # @return [void]
  #
  # @api public
  def on_close(code, reason)
    connection.terminate
    terminate
    log_debug("#{@actor.class} dispatching on close  #{code} #{reason}")
    if @actor.respond_to?(:async)
      @actor.async.on_close(code, reason)
    else
      @actor.on_close(code, reason)
    end
  end

private

  # method used to send an action to the webserver reactor , to a chanel and with data
  #
  # @param [String] action
  # @param [String] channel
  # @param [Hash] data
  #
  # @return [void]
  #
  # @api private
  def send_action(action, channel = nil, data = {})
    data = data.is_a?(Hash) ? data : {}
    publishing_data = { 'client_action' => action, 'channel' => channel, 'data' => data }.reject { |_key, value| value.blank? }
    async.chat(publishing_data)
  end

  # method used to send messages to the webserver
  # checks too see if the message is a hash and if it is it will transform it to JSON and send it to the webser
  # otherwise will construct a JSON object that will have the key action with the value 'message" and the key message witth the parameter's value
  #
  # @param [Hash] message
  #
  # @return [void]
  #
  # @api private
  def chat(message)
    final_message = nil
    if message.is_a?(Hash)
      final_message = message.to_json
    else
      final_message = JSON.dump(action: 'message', message: message)
    end
    log_debug("#{@actor.class} sends JSON #{final_message}")
    connection.text final_message
  end
end

Instance Method Details

#connectionCelluloid::WebSocket::Client

the method will return the client that is used to

Returns:

  • (Celluloid::WebSocket::Client)

    the websocket connection used to connect to server



70
71
72
# File 'lib/celluloid_pubsub/client.rb', line 70

def connection
  @connection ||= Celluloid::WebSocket::Client.new("ws://#{hostname}:#{port}#{path}", Actor.current)
end

#debug_enabled?boolean

checks if debug is enabled

Returns:

  • (boolean)


121
122
123
# File 'lib/celluloid_pubsub/client.rb', line 121

def debug_enabled?
  @options.fetch('enable_debug', false).to_s == 'true'
end

#hostnameString

the method will return the hostname of the server

Returns:

  • (String)

    the hostname where the server runs on



80
81
82
# File 'lib/celluloid_pubsub/client.rb', line 80

def hostname
  @hostname ||= @options.fetch('hostname', CelluloidPubsub::WebServer::HOST)
end

#log_file_pathString?

the method will return the path to the log file where debug messages will be printed

Returns:

  • (String, nil)

    return the path to the log file where debug messages will be printed



49
50
51
# File 'lib/celluloid_pubsub/client.rb', line 49

def log_file_path
  @log_file_path ||= @options.fetch('log_file_path', nil)
end

#on_close(code, reason) ⇒ void

This method returns an undefined value.

callback executes when connection closes

Parameters:

  • code (String)
  • reason (String)


218
219
220
221
222
223
224
225
226
227
# File 'lib/celluloid_pubsub/client.rb', line 218

def on_close(code, reason)
  connection.terminate
  terminate
  log_debug("#{@actor.class} dispatching on close  #{code} #{reason}")
  if @actor.respond_to?(:async)
    @actor.async.on_close(code, reason)
  else
    @actor.on_close(code, reason)
  end
end

#on_message(data) ⇒ void

This method returns an undefined value.

callback executes when actor receives a message from a subscribed channel and parses the message using JSON.parse and dispatches the parsed message to the original actor that made the connection

Parameters:

  • data (JSON)


199
200
201
202
203
204
205
206
207
# File 'lib/celluloid_pubsub/client.rb', line 199

def on_message(data)
  message = JSON.parse(data)
  log_debug("#{@actor.class} received JSON  #{message}")
  if @actor.respond_to?(:async)
    @actor.async.on_message(message)
  else
    @actor.on_message(message)
  end
end

#on_openvoid

This method returns an undefined value.

callback executes after connection is opened and delegates action to actor



185
186
187
188
# File 'lib/celluloid_pubsub/client.rb', line 185

def on_open
  log_debug("#{@actor.class} websocket connection opened")
  async.subscribe(@channel) if @channel.present?
end

#pathString

the method will return the path of the URL on which the servers acccepts the connection

Returns:

  • (String)

    the URL path that the server is mounted on



100
101
102
# File 'lib/celluloid_pubsub/client.rb', line 100

def path
  @path ||= @options.fetch('path', CelluloidPubsub::WebServer::PATH)
end

#portString

the method will return the port on which the server accepts connections

Returns:

  • (String)

    the port on which the server accepts connections



90
91
92
# File 'lib/celluloid_pubsub/client.rb', line 90

def port
  @port ||= @options.fetch('port', CelluloidPubsub::WebServer::PORT)
end

#publish(channel, data) ⇒ void

This method returns an undefined value.

publishes to a channel some data (can be anything)

Parameters:

  • channel (string)
  • data (#to_s)


145
146
147
# File 'lib/celluloid_pubsub/client.rb', line 145

def publish(channel, data)
  send_action('publish', channel, data)
end

#shutdownvoid

This method returns an undefined value.

the method will terminate the current actor



110
111
112
113
# File 'lib/celluloid_pubsub/client.rb', line 110

def shutdown
  log_debug "#{self.class} tries to 'shudown'"
  terminate
end

#subscribe(channel, data = {}) ⇒ void

This method returns an undefined value.

subscribes to a channel . need to be used inside the connect block passed to the actor

Parameters:

  • channel (string)


132
133
134
135
# File 'lib/celluloid_pubsub/client.rb', line 132

def subscribe(channel, data = {})
  log_debug("#{@actor.class} tries to subscribe to channel  #{channel}")
  async.send_action('subscribe', channel, data)
end

#supervise_actorsvoid

This method returns an undefined value.

the method will link the current actor to the actor that is attached to, and the connection to the current actor



58
59
60
61
62
# File 'lib/celluloid_pubsub/client.rb', line 58

def supervise_actors
  current_actor = Actor.current
  @actor.link current_actor if @actor.respond_to?(:link)
  current_actor.link connection
end

#unsubscribe(channel) ⇒ void

This method returns an undefined value.

unsubscribes current client from a channel

Parameters:

  • channel (string)


156
157
158
# File 'lib/celluloid_pubsub/client.rb', line 156

def unsubscribe(channel)
  send_action('unsubscribe', channel)
end

#unsubscribe_allvoid

This method returns an undefined value.

unsubscribes all clients from all channels



176
177
178
# File 'lib/celluloid_pubsub/client.rb', line 176

def unsubscribe_all
  send_action('unsubscribe_all')
end

#unsubscribe_clients(channel) ⇒ void

This method returns an undefined value.

unsubscribes all clients subscribed to a channel

Parameters:

  • channel (string)


167
168
169
# File 'lib/celluloid_pubsub/client.rb', line 167

def unsubscribe_clients(channel)
  send_action('unsubscribe_clients', channel)
end