Class: CelluloidPubsub::WebServer

Inherits:
Reel::Server::HTTP
  • Object
show all
Includes:
BaseActor
Defined in:
lib/celluloid_pubsub/web_server.rb

Overview

webserver to which socket connects should connect to . the server will dispatch each request into a new Reactor which will handle the action based on the message

Constant Summary collapse

HOST =

The hostname on which the webserver runs on by default

'0.0.0.0'
PATH =

The request path that the webserver accepts by default

'/ws'
CLASSIC_ADAPTER =

The name of the default adapter

'classic'

Instance Attribute Summary collapse

Attributes included from BaseActor

#config

Class Method Summary collapse

Instance Method Summary collapse

Methods included from BaseActor

boot_up, celluloid_logger_class, celluloid_version, config, included, setup_actor_supervision, version_less_than_eigthteen?, version_less_than_seventeen?

Methods included from Helper

action_subscribe?, #actor_dead?, #cell_actor, fetch_gem_version, filtered_error?, find_loaded_gem, find_loaded_gem_property, get_parsed_version, log_debug, #own_self, parse_options, setup_celluloid_exception_handler, setup_celluloid_logger, setup_log_file, #succesfull_subscription?, verify_gem_version

Constructor Details

#initialize(options = {}) ⇒ void

receives a list of options that are used to configure the webserver

:nocov:

Parameters:

  • options (Hash) (defaults to: {})

    the options that can be used to connect to webser and send additional data

Options Hash (options):

  • :hostname (String)

    The hostname on which the webserver runs on

  • :port (Integer)

    The port on which the webserver runs on

  • :path (String)

    The request path that the webserver accepts

  • :spy (Boolean)

    Enable this only if you want to enable debugging for the webserver



52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/celluloid_pubsub/web_server.rb', line 52

def initialize(options = {})
  Celluloid.boot unless Celluloid.running?
  @server_options = parse_options(options)
  @subscribers = {}
  @mutex = Mutex.new
  @timers_mutex = Mutex.new
  @shutting_down = false
  @reactors = []
  setup_celluloid_logger
  log_debug "CelluloidPubsub::WebServer example starting on #{hostname}:#{port}"
  super(hostname, port, { spy: spy, backlog: backlog }, &method(:on_connection))
end

Instance Attribute Details

#mutexMutex, Object

Returns:

  • (Mutex)

    The mutex that will synchronize actions on subscribers

  • (Object)

    the current value of mutex



23
24
25
# File 'lib/celluloid_pubsub/web_server.rb', line 23

def mutex
  @mutex
end

#reactorsObject (readonly)

Returns the value of attribute reactors.



35
36
37
# File 'lib/celluloid_pubsub/web_server.rb', line 35

def reactors
  @reactors
end

#server_optionsHash, Object

Returns:

  • (Hash)

    options used to configure the webserver

  • (Object)

    the current value of server_options



23
24
25
# File 'lib/celluloid_pubsub/web_server.rb', line 23

def server_options
  @server_options
end

#subscribersHash, Object

Returns:

  • (Hash)

    The hostname on which the webserver runs on

  • (Object)

    the current value of subscribers



23
24
25
# File 'lib/celluloid_pubsub/web_server.rb', line 23

def subscribers
  @subscribers
end

#timers_mutexMutex, Object

Returns:

  • (Mutex)

    The mutex that will synchronize actions on registry messages

  • (Object)

    the current value of timers_mutex



23
24
25
# File 'lib/celluloid_pubsub/web_server.rb', line 23

def timers_mutex
  @timers_mutex
end

Class Method Details

.find_unused_portInteger

the method get from the socket connection that is already opened the port used.

Returns:

  • (Integer)

    returns the port that can be used to issue new connection

See Also:

  • #open_socket_on_unused_port


104
105
106
107
108
109
110
111
# File 'lib/celluloid_pubsub/web_server.rb', line 104

def self.find_unused_port
  @@unused_port ||= begin
    socket = open_socket_on_unused_port
    port = socket.addr[1]
    socket.close
    port
  end
end

.open_socket_on_unused_portTCPServer

the method will return the socket conection opened on the unused port

Returns:

  • (TCPServer)

    return the socket connection opened on a random port



71
72
73
74
75
# File 'lib/celluloid_pubsub/web_server.rb', line 71

def self.open_socket_on_unused_port
  return ::TCPServer.open('0.0.0.0', 0) if socket_families.key?('AF_INET')
  return ::TCPServer.open('::', 0) if socket_families.key?('AF_INET6')
  ::TCPServer.open(0)
end

.socket_familiesHash

the method will return the socket families avaiable

rubocop:disable Style/ClassVars

Returns:

  • (Hash)

    return the socket families available as keys in the hash



94
95
96
# File 'lib/celluloid_pubsub/web_server.rb', line 94

def self.socket_families
  @@socket_families ||= Hash[*socket_infos.map { |af, *_| af }.uniq.zip([]).flatten]
end

.socket_infosArray

the method will return the socket information available as an array

Returns:

  • (Array)

    return the socket information available as an array



83
84
85
# File 'lib/celluloid_pubsub/web_server.rb', line 83

def self.socket_infos
  ::Socket::getaddrinfo('localhost', nil, Socket::AF_UNSPEC, Socket::SOCK_STREAM, 0, Socket::AI_PASSIVE)
end

Instance Method Details

#actor_died(actor, reason) ⇒ void

This method returns an undefined value.

method called when the actor is exiting

Parameters:

  • actor (actor)
    • the current actor

  • reason (Hash)
    • the reason it crashed



386
387
388
389
# File 'lib/celluloid_pubsub/web_server.rb', line 386

def actor_died(actor, reason)
  @shutting_down = true
  log_debug "Oh no! #{actor.inspect} has died because of a #{reason.class}"
end

#adapterBoolean

the method will return true if redis can be used otherwise false

Returns:

  • (Boolean)

    return true if redis can be used otherwise false



144
145
146
147
# File 'lib/celluloid_pubsub/web_server.rb', line 144

def adapter
  @adapter ||= @server_options.fetch('adapter', CelluloidPubsub::WebServer::CLASSIC_ADAPTER)
  @adapter.present? ? @adapter : CelluloidPubsub::WebServer::CLASSIC_ADAPTER
end

#backlogInteger

the method will return the number of connections allowed to the server

Returns:

  • (Integer)

    returns the number of connections allowed to the server



250
251
252
# File 'lib/celluloid_pubsub/web_server.rb', line 250

def backlog
  @backlog = @server_options.fetch('backlog', 1024)
end

#bind_timers(run = false) ⇒ void

This method returns an undefined value.

the method will run indefinitely and will check if are there any unpublished messages that can be send to new subscribers

Parameters:

  • run (Boolean) (defaults to: false)

    FLag to control if the server should try checking if there are any unpublished messages that need to be sent



133
134
135
136
# File 'lib/celluloid_pubsub/web_server.rb', line 133

def bind_timers(run = false)
  try_sending_unpublished if run
  after(0.1) { bind_timers(true) }
end

#debug_enabled?Boolean

the method will return true if debug is enabled otherwise false

Returns:

  • (Boolean)

    returns true if debug is enabled otherwise false



165
166
167
168
# File 'lib/celluloid_pubsub/web_server.rb', line 165

def debug_enabled?
  @debug_enabled = @server_options.fetch('enable_debug', true)
  @debug_enabled == true
end

#dispatch_websocket_request(request) ⇒ void

This method returns an undefined value.

method will instantiate a new reactor object, will link the reactor to the current actor and will dispatch the request to the reactor

Parameters:

  • request (Reel::WebSocket)

    The request that was made to the webserver

See Also:



306
307
308
309
310
# File 'lib/celluloid_pubsub/web_server.rb', line 306

def dispatch_websocket_request(request)
  reactor = reactor_class.new
  Actor.current.link reactor
  route_websocket(reactor, request.websocket)
end

#handle_dispatched_message(reactor, data) ⇒ void

This method returns an undefined value.

If the message can be parsed into a Hash it will respond to the reactor’s websocket connection with the same message in JSON format otherwise will try send the message how it is and escaped into JSON format

Parameters:

  • reactor (CelluloidPubsub::Reactor)

    The reactor that received an unhandled message

  • data (Object)

    The message that the reactor could not handle



371
372
373
374
375
376
# File 'lib/celluloid_pubsub/web_server.rb', line 371

def handle_dispatched_message(reactor, data)
  log_debug "#{self.class} trying to dispatch message  #{data.inspect}"
  message = reactor.parse_json_data(data)
  final_data = message.present? && message.is_a?(Hash) ? message.to_json : data.to_json
  reactor.websocket << final_data
end

#hostnameString

the method will return the hostname on which the server is running on

Returns:

  • (String)

    returns the hostname on which the server is running on



210
211
212
# File 'lib/celluloid_pubsub/web_server.rb', line 210

def hostname
  @hostname = @server_options.fetch('hostname', CelluloidPubsub::WebServer::HOST)
end

#log_file_pathString

the method will return the file path of the log file where debug messages will be printed

Returns:

  • (String)

    returns the file path of the log file where debug messages will be printed



191
192
193
# File 'lib/celluloid_pubsub/web_server.rb', line 191

def log_file_path
  @log_file_path = @server_options.fetch('log_file_path', nil)
end

#log_levelInteger?

the method will return the log level of the logger

Returns:

  • (Integer, nil)

    return the log level used by the logger ( default is 1 - info)



200
201
202
# File 'lib/celluloid_pubsub/web_server.rb', line 200

def log_level
  @log_level ||= @server_options['log_level'] || ::Logger::Severity::INFO
end

#on_connection(connection) ⇒ void

This method returns an undefined value.

callback that will execute when receiving new conections If the connections is a websocket will call method #route_websocket and if the connection is HTTP will call method #route_request For websocket connections , the connection is detached from the server and dispatched to another actor

Parameters:

  • connection (Reel::WebSocket)

    The connection that was made to the webserver

See Also:



267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/celluloid_pubsub/web_server.rb', line 267

def on_connection(connection)
  while request = connection.request
    if request.websocket?
      log_debug "#{self.class} Received a WebSocket connection #{request.websocket.url}"

      # We're going to hand off this connection to another actor (Writer/Reader)
      # However, initially Reel::Connections are "attached" to the
      # Reel::Server::HTTP actor, meaning that the server manages the connection
      # lifecycle (e.g. error handling) for us.
      #
      # If we want to hand this connection off to another actor, we first
      # need to detach it from the Reel::Server (in this case, Reel::Server::HTTP)
      connection.detach
      dispatch_websocket_request(request)
      return
    else
      route_request connection, request
    end
  end
end

#pathString

the method will return the URL path on which will acceept connections

Returns:

  • (String)

    returns the URL path on which will acceept connections



230
231
232
# File 'lib/celluloid_pubsub/web_server.rb', line 230

def path
  @path = @server_options.fetch('path', CelluloidPubsub::WebServer::PATH)
end

#portString

the method will return the port on which will accept connections

Returns:

  • (String)

    returns the port on which will accept connections



220
221
222
# File 'lib/celluloid_pubsub/web_server.rb', line 220

def port
  @port ||= @server_options.fetch('port', nil) || self.class.find_unused_port
end

#reactor_classClass

returns the reactor class that will handle the connection depending if redis is enabled or not

Returns:

  • (Class)

    returns the reactor class that will handle the connection depending if redis is enabled or not

See Also:

  • #redis_enabled?


294
295
296
# File 'lib/celluloid_pubsub/web_server.rb', line 294

def reactor_class
  adapter == CelluloidPubsub::WebServer::CLASSIC_ADAPTER ? CelluloidPubsub::Reactor : "CelluloidPubsub::#{adapter.camelize}Reactor".constantize
end

#route_request(connection, request) ⇒ void

This method returns an undefined value.

HTTP connections are not accepted so this method will show 404 message “Not Found”

Parameters:

  • connection (Reel::WebSocket)

    The HTTP connection that was received

  • request (Reel::Request)

    The request that was made to the webserver and contains the type , the url, and the parameters



320
321
322
323
# File 'lib/celluloid_pubsub/web_server.rb', line 320

def route_request(connection, request)
  log_debug "404 Not Found: #{request.path}"
  connection.respond :not_found, 'Not found'
end

#route_websocket(reactor, socket) ⇒ void

This method returns an undefined value.

If the socket url matches with the one accepted by the server, it will dispatch the socket connection to a new reactor Reactor#work The new actor is linked to the webserver



335
336
337
338
339
340
341
342
343
# File 'lib/celluloid_pubsub/web_server.rb', line 335

def route_websocket(reactor, socket)
  url = socket.url
  if url == path || url == '/?'
    reactor.async.work(socket, Actor.current)
  else
    log_debug "Received invalid WebSocket request for: #{url}"
    socket.close
  end
end

#runObject

this method is overriden from the Reel::Server::HTTP in order to set the spy to the celluloid logger before the connection is accepted.

See Also:

  • #handle_connection


118
119
120
121
122
# File 'lib/celluloid_pubsub/web_server.rb', line 118

def run
  @spy = Celluloid.logger if spy
  async.bind_timers
  loop { async.handle_connection @server.accept }
end

#shutdownvoid

This method returns an undefined value.

the method will terminate the current actor



176
177
178
179
180
181
182
183
# File 'lib/celluloid_pubsub/web_server.rb', line 176

def shutdown
  @shutting_down = true
  log_debug "#{self.class} tries to 'shutdown'"
  reactors.each do |reactor|
    reactor.terminate unless actor_dead?(reactor)
  end
  terminate
end

#shutting_down?Boolean

the method will return true if the actor is shutting down

Returns:

  • (Boolean)

    returns true if the actor is shutting down



155
156
157
# File 'lib/celluloid_pubsub/web_server.rb', line 155

def shutting_down?
  @shutting_down == true
end

#spyBoolean

the method will return true if connection to the server should be spied upon

Returns:

  • (Boolean)

    returns true if connection to the server should be spied upon, otherwise false



240
241
242
# File 'lib/celluloid_pubsub/web_server.rb', line 240

def spy
  @spy = @server_options.fetch('spy', false)
end

#try_sending_unpublishedvoid

This method returns an undefined value.

this method will know when a client has successfully registered and will write to the socket all messages that were published to that channel before the actor subscribed



352
353
354
355
356
357
358
359
360
# File 'lib/celluloid_pubsub/web_server.rb', line 352

def try_sending_unpublished
  CelluloidPubsub::Registry.messages.each_key do |channel|
    next if (clients = subscribers[channel]).blank?
    clients.dup.pmap do |hash|
      hash[:reactor].send_unpublished(channel)
    end
    clients.last[:reactor].clear_unpublished_messages(channel)
  end
end