Class: CelluloidPubsub::WebServer
- Inherits:
-
Reel::Server::HTTP
- Object
- Reel::Server::HTTP
- CelluloidPubsub::WebServer
- Includes:
- BaseActor
- Defined in:
- lib/celluloid_pubsub/web_server.rb
Overview
webserver to which socket connects should connect to . the server will dispatch each request into a new Reactor which will handle the action based on the message
Constant Summary collapse
- HOST =
The hostname on which the webserver runs on by default
'0.0.0.0'
- PATH =
The request path that the webserver accepts by default
'/ws'
- CLASSIC_ADAPTER =
The name of the default adapter
'classic'
Instance Attribute Summary collapse
- #mutex ⇒ Mutex, Object
-
#reactors ⇒ Object
readonly
Returns the value of attribute reactors.
- #server_options ⇒ Hash, Object
- #subscribers ⇒ Hash, Object
- #timers_mutex ⇒ Mutex, Object
Attributes included from BaseActor
Class Method Summary collapse
-
.find_unused_port ⇒ Integer
the method get from the socket connection that is already opened the port used.
-
.open_socket_on_unused_port ⇒ TCPServer
the method will return the socket conection opened on the unused port.
-
.socket_families ⇒ Hash
the method will return the socket families avaiable.
-
.socket_infos ⇒ Array
the method will return the socket information available as an array.
Instance Method Summary collapse
-
#actor_died(actor, reason) ⇒ void
method called when the actor is exiting.
-
#adapter ⇒ Boolean
the method will return true if redis can be used otherwise false.
-
#backlog ⇒ Integer
the method will return the number of connections allowed to the server.
-
#bind_timers(run = false) ⇒ void
the method will run indefinitely and will check if are there any unpublished messages that can be send to new subscribers.
-
#debug_enabled? ⇒ Boolean
the method will return true if debug is enabled otherwise false.
-
#dispatch_websocket_request(request) ⇒ void
method will instantiate a new reactor object, will link the reactor to the current actor and will dispatch the request to the reactor.
-
#handle_dispatched_message(reactor, data) ⇒ void
If the message can be parsed into a Hash it will respond to the reactor’s websocket connection with the same message in JSON format otherwise will try send the message how it is and escaped into JSON format.
-
#hostname ⇒ String
the method will return the hostname on which the server is running on.
-
#initialize(options = {}) ⇒ void
constructor
receives a list of options that are used to configure the webserver.
-
#log_file_path ⇒ String
the method will return the file path of the log file where debug messages will be printed.
-
#log_level ⇒ Integer?
the method will return the log level of the logger.
-
#on_connection(connection) ⇒ void
callback that will execute when receiving new conections If the connections is a websocket will call method #route_websocket and if the connection is HTTP will call method #route_request For websocket connections , the connection is detached from the server and dispatched to another actor.
-
#path ⇒ String
the method will return the URL path on which will acceept connections.
-
#port ⇒ String
the method will return the port on which will accept connections.
-
#reactor_class ⇒ Class
returns the reactor class that will handle the connection depending if redis is enabled or not.
-
#route_request(connection, request) ⇒ void
HTTP connections are not accepted so this method will show 404 message “Not Found”.
-
#route_websocket(reactor, socket) ⇒ void
If the socket url matches with the one accepted by the server, it will dispatch the socket connection to a new reactor Reactor#work The new actor is linked to the webserver.
-
#run ⇒ Object
this method is overriden from the Reel::Server::HTTP in order to set the spy to the celluloid logger before the connection is accepted.
-
#shutdown ⇒ void
the method will terminate the current actor.
-
#shutting_down? ⇒ Boolean
the method will return true if the actor is shutting down.
-
#spy ⇒ Boolean
the method will return true if connection to the server should be spied upon.
-
#try_sending_unpublished ⇒ void
this method will know when a client has successfully registered and will write to the socket all messages that were published to that channel before the actor subscribed.
Methods included from BaseActor
boot_up, celluloid_logger_class, celluloid_version, config, included, setup_actor_supervision, version_less_than_eigthteen?, version_less_than_seventeen?
Methods included from Helper
action_subscribe?, #actor_dead?, #cell_actor, fetch_gem_version, filtered_error?, find_loaded_gem, find_loaded_gem_property, get_parsed_version, log_debug, #own_self, parse_options, setup_celluloid_exception_handler, setup_celluloid_logger, setup_log_file, #succesfull_subscription?, verify_gem_version
Constructor Details
#initialize(options = {}) ⇒ void
receives a list of options that are used to configure the webserver
:nocov:
52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/celluloid_pubsub/web_server.rb', line 52 def initialize( = {}) Celluloid.boot unless Celluloid.running? @server_options = () @subscribers = {} @mutex = Mutex.new @timers_mutex = Mutex.new @shutting_down = false @reactors = [] setup_celluloid_logger log_debug "CelluloidPubsub::WebServer example starting on #{hostname}:#{port}" super(hostname, port, { spy: spy, backlog: backlog }, &method(:on_connection)) end |
Instance Attribute Details
#mutex ⇒ Mutex, Object
23 24 25 |
# File 'lib/celluloid_pubsub/web_server.rb', line 23 def mutex @mutex end |
#reactors ⇒ Object (readonly)
Returns the value of attribute reactors.
35 36 37 |
# File 'lib/celluloid_pubsub/web_server.rb', line 35 def reactors @reactors end |
#server_options ⇒ Hash, Object
23 24 25 |
# File 'lib/celluloid_pubsub/web_server.rb', line 23 def @server_options end |
#subscribers ⇒ Hash, Object
23 24 25 |
# File 'lib/celluloid_pubsub/web_server.rb', line 23 def subscribers @subscribers end |
#timers_mutex ⇒ Mutex, Object
23 24 25 |
# File 'lib/celluloid_pubsub/web_server.rb', line 23 def timers_mutex @timers_mutex end |
Class Method Details
.find_unused_port ⇒ Integer
the method get from the socket connection that is already opened the port used.
104 105 106 107 108 109 110 111 |
# File 'lib/celluloid_pubsub/web_server.rb', line 104 def self.find_unused_port @@unused_port ||= begin socket = open_socket_on_unused_port port = socket.addr[1] socket.close port end end |
.open_socket_on_unused_port ⇒ TCPServer
the method will return the socket conection opened on the unused port
71 72 73 74 75 |
# File 'lib/celluloid_pubsub/web_server.rb', line 71 def self.open_socket_on_unused_port return ::TCPServer.open('0.0.0.0', 0) if socket_families.key?('AF_INET') return ::TCPServer.open('::', 0) if socket_families.key?('AF_INET6') ::TCPServer.open(0) end |
.socket_families ⇒ Hash
the method will return the socket families avaiable
rubocop:disable Style/ClassVars
94 95 96 |
# File 'lib/celluloid_pubsub/web_server.rb', line 94 def self.socket_families @@socket_families ||= Hash[*socket_infos.map { |af, *_| af }.uniq.zip([]).flatten] end |
.socket_infos ⇒ Array
the method will return the socket information available as an array
83 84 85 |
# File 'lib/celluloid_pubsub/web_server.rb', line 83 def self.socket_infos ::Socket::getaddrinfo('localhost', nil, Socket::AF_UNSPEC, Socket::SOCK_STREAM, 0, Socket::AI_PASSIVE) end |
Instance Method Details
#actor_died(actor, reason) ⇒ void
This method returns an undefined value.
method called when the actor is exiting
386 387 388 389 |
# File 'lib/celluloid_pubsub/web_server.rb', line 386 def actor_died(actor, reason) @shutting_down = true log_debug "Oh no! #{actor.inspect} has died because of a #{reason.class}" end |
#adapter ⇒ Boolean
the method will return true if redis can be used otherwise false
144 145 146 147 |
# File 'lib/celluloid_pubsub/web_server.rb', line 144 def adapter @adapter ||= @server_options.fetch('adapter', CelluloidPubsub::WebServer::CLASSIC_ADAPTER) @adapter.present? ? @adapter : CelluloidPubsub::WebServer::CLASSIC_ADAPTER end |
#backlog ⇒ Integer
the method will return the number of connections allowed to the server
250 251 252 |
# File 'lib/celluloid_pubsub/web_server.rb', line 250 def backlog @backlog = @server_options.fetch('backlog', 1024) end |
#bind_timers(run = false) ⇒ void
This method returns an undefined value.
the method will run indefinitely and will check if are there any unpublished messages that can be send to new subscribers
133 134 135 136 |
# File 'lib/celluloid_pubsub/web_server.rb', line 133 def bind_timers(run = false) try_sending_unpublished if run after(0.1) { bind_timers(true) } end |
#debug_enabled? ⇒ Boolean
the method will return true if debug is enabled otherwise false
165 166 167 168 |
# File 'lib/celluloid_pubsub/web_server.rb', line 165 def debug_enabled? @debug_enabled = @server_options.fetch('enable_debug', true) @debug_enabled == true end |
#dispatch_websocket_request(request) ⇒ void
This method returns an undefined value.
method will instantiate a new reactor object, will link the reactor to the current actor and will dispatch the request to the reactor
306 307 308 309 310 |
# File 'lib/celluloid_pubsub/web_server.rb', line 306 def dispatch_websocket_request(request) reactor = reactor_class.new Actor.current.link reactor route_websocket(reactor, request.websocket) end |
#handle_dispatched_message(reactor, data) ⇒ void
This method returns an undefined value.
If the message can be parsed into a Hash it will respond to the reactor’s websocket connection with the same message in JSON format otherwise will try send the message how it is and escaped into JSON format
371 372 373 374 375 376 |
# File 'lib/celluloid_pubsub/web_server.rb', line 371 def (reactor, data) log_debug "#{self.class} trying to dispatch message #{data.inspect}" = reactor.parse_json_data(data) final_data = .present? && .is_a?(Hash) ? .to_json : data.to_json reactor.websocket << final_data end |
#hostname ⇒ String
the method will return the hostname on which the server is running on
210 211 212 |
# File 'lib/celluloid_pubsub/web_server.rb', line 210 def hostname @hostname = @server_options.fetch('hostname', CelluloidPubsub::WebServer::HOST) end |
#log_file_path ⇒ String
the method will return the file path of the log file where debug messages will be printed
191 192 193 |
# File 'lib/celluloid_pubsub/web_server.rb', line 191 def log_file_path @log_file_path = @server_options.fetch('log_file_path', nil) end |
#log_level ⇒ Integer?
the method will return the log level of the logger
200 201 202 |
# File 'lib/celluloid_pubsub/web_server.rb', line 200 def log_level @log_level ||= @server_options['log_level'] || ::Logger::Severity::INFO end |
#on_connection(connection) ⇒ void
This method returns an undefined value.
callback that will execute when receiving new conections If the connections is a websocket will call method #route_websocket and if the connection is HTTP will call method #route_request For websocket connections , the connection is detached from the server and dispatched to another actor
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/celluloid_pubsub/web_server.rb', line 267 def on_connection(connection) while request = connection.request if request.websocket? log_debug "#{self.class} Received a WebSocket connection #{request.websocket.url}" # We're going to hand off this connection to another actor (Writer/Reader) # However, initially Reel::Connections are "attached" to the # Reel::Server::HTTP actor, meaning that the server manages the connection # lifecycle (e.g. error handling) for us. # # If we want to hand this connection off to another actor, we first # need to detach it from the Reel::Server (in this case, Reel::Server::HTTP) connection.detach dispatch_websocket_request(request) return else route_request connection, request end end end |
#path ⇒ String
the method will return the URL path on which will acceept connections
230 231 232 |
# File 'lib/celluloid_pubsub/web_server.rb', line 230 def path @path = @server_options.fetch('path', CelluloidPubsub::WebServer::PATH) end |
#port ⇒ String
the method will return the port on which will accept connections
220 221 222 |
# File 'lib/celluloid_pubsub/web_server.rb', line 220 def port @port ||= @server_options.fetch('port', nil) || self.class.find_unused_port end |
#reactor_class ⇒ Class
returns the reactor class that will handle the connection depending if redis is enabled or not
294 295 296 |
# File 'lib/celluloid_pubsub/web_server.rb', line 294 def reactor_class adapter == CelluloidPubsub::WebServer::CLASSIC_ADAPTER ? CelluloidPubsub::Reactor : "CelluloidPubsub::#{adapter.camelize}Reactor".constantize end |
#route_request(connection, request) ⇒ void
This method returns an undefined value.
HTTP connections are not accepted so this method will show 404 message “Not Found”
320 321 322 323 |
# File 'lib/celluloid_pubsub/web_server.rb', line 320 def route_request(connection, request) log_debug "404 Not Found: #{request.path}" connection.respond :not_found, 'Not found' end |
#route_websocket(reactor, socket) ⇒ void
This method returns an undefined value.
If the socket url matches with the one accepted by the server, it will dispatch the socket connection to a new reactor Reactor#work The new actor is linked to the webserver
335 336 337 338 339 340 341 342 343 |
# File 'lib/celluloid_pubsub/web_server.rb', line 335 def route_websocket(reactor, socket) url = socket.url if url == path || url == '/?' reactor.async.work(socket, Actor.current) else log_debug "Received invalid WebSocket request for: #{url}" socket.close end end |
#run ⇒ Object
this method is overriden from the Reel::Server::HTTP in order to set the spy to the celluloid logger before the connection is accepted.
118 119 120 121 122 |
# File 'lib/celluloid_pubsub/web_server.rb', line 118 def run @spy = Celluloid.logger if spy async.bind_timers loop { async.handle_connection @server.accept } end |
#shutdown ⇒ void
This method returns an undefined value.
the method will terminate the current actor
176 177 178 179 180 181 182 183 |
# File 'lib/celluloid_pubsub/web_server.rb', line 176 def shutdown @shutting_down = true log_debug "#{self.class} tries to 'shutdown'" reactors.each do |reactor| reactor.terminate unless actor_dead?(reactor) end terminate end |
#shutting_down? ⇒ Boolean
the method will return true if the actor is shutting down
155 156 157 |
# File 'lib/celluloid_pubsub/web_server.rb', line 155 def shutting_down? @shutting_down == true end |
#spy ⇒ Boolean
the method will return true if connection to the server should be spied upon
240 241 242 |
# File 'lib/celluloid_pubsub/web_server.rb', line 240 def spy @spy = @server_options.fetch('spy', false) end |
#try_sending_unpublished ⇒ void
This method returns an undefined value.
this method will know when a client has successfully registered and will write to the socket all messages that were published to that channel before the actor subscribed
352 353 354 355 356 357 358 359 360 |
# File 'lib/celluloid_pubsub/web_server.rb', line 352 def try_sending_unpublished CelluloidPubsub::Registry..each_key do |channel| next if (clients = subscribers[channel]).blank? clients.dup.pmap do |hash| hash[:reactor].send_unpublished(channel) end clients.last[:reactor].(channel) end end |