Class: PuppetEditorServices::Server::Tcp
- Defined in:
- lib/puppet_editor_services/server/tcp.rb
Class Attribute Summary collapse
-
.c_locker ⇒ Object
readonly
Returns the value of attribute c_locker.
-
.e_locker ⇒ Object
readonly
Returns the value of attribute e_locker.
-
.events ⇒ Object
readonly
Returns the value of attribute events.
-
.io_connection_dic ⇒ Object
readonly
Returns the value of attribute io_connection_dic.
-
.io_locker ⇒ Object
readonly
Returns the value of attribute io_locker.
-
.s_locker ⇒ Object
readonly
Returns the value of attribute s_locker.
-
.services ⇒ Object
readonly
Returns the value of attribute services.
Attributes inherited from Base
#connection_options, #handler_options, #protocol_options, #server_options
Instance Method Summary collapse
- #add_connection(io, service_object) ⇒ Object private
- #add_service(hostname = 'localhost', port = nil, parameters = {}) ⇒ Object private
-
#callback(object, method, *args, &block) ⇒ Object
private
creates an asynchronous call to a method, with an optional callback (shortcut).
-
#clear_connections ⇒ Object
private
clears closed connections from the stack.
- #connection(connection_id) ⇒ Object
-
#events? ⇒ Boolean
private
Events (Callbacks) / Multi-tasking Platform returns true if there are any unhandled events.
-
#fire_event ⇒ Object
private
event handling FIFO.
-
#get_data(io, connection_data) ⇒ Object
private
this code will be called when a socket recieves data.
-
#initialize(server_options, protocol_options, handler_options) ⇒ Tcp
constructor
A new instance of Tcp.
-
#io_review ⇒ Object
private
Reactor.
- #name ⇒ Object
-
#push_event(handler, *args, &block) ⇒ Object
private
pushes an event to the event’s stack if a block is passed along, it will be used as a callback: the block will be called with the values returned by the handler’s ‘call` method.
- #remove_connection(io) ⇒ Object private
- #remove_connection_async(io) ⇒ Object
-
#run_async(*args, &block) ⇒ Object
private
Runs the block asynchronously by pushing it as an event to the event’s stack.
-
#start ⇒ Object
main loop and activation code.
- #stop_all_services ⇒ Object private
- #stop_connections ⇒ Object private
- #stop_services(from_trap = false) ⇒ Object
Methods inherited from Base
Constructor Details
#initialize(server_options, protocol_options, handler_options) ⇒ Tcp
Returns a new instance of Tcp.
31 32 33 34 35 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 31 def initialize(, , ) super(, {}, , ) add_service([:ipaddress], [:port]) end |
Class Attribute Details
.c_locker ⇒ Object (readonly)
Returns the value of attribute c_locker.
20 21 22 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 20 def c_locker @c_locker end |
.e_locker ⇒ Object (readonly)
Returns the value of attribute e_locker.
20 21 22 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 20 def e_locker @e_locker end |
.events ⇒ Object (readonly)
Returns the value of attribute events.
20 21 22 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 20 def events @events end |
.io_connection_dic ⇒ Object (readonly)
Returns the value of attribute io_connection_dic.
20 21 22 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 20 def io_connection_dic @io_connection_dic end |
.io_locker ⇒ Object (readonly)
Returns the value of attribute io_locker.
20 21 22 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 20 def io_locker @io_locker end |
.s_locker ⇒ Object (readonly)
Returns the value of attribute s_locker.
20 21 22 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 20 def s_locker @s_locker end |
.services ⇒ Object (readonly)
Returns the value of attribute services.
20 21 22 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 20 def services @services end |
Instance Method Details
#add_connection(io, service_object) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
289 290 291 292 293 294 295 296 297 298 299 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 289 def add_connection(io, service_object) conn = ::PuppetEditorServices::Connection::Tcp.new(self, io) if io self.class.c_locker.synchronize do self.class.io_connection_dic[io] = { handler: conn, service: service_object } end end callback(conn, :post_init) rescue Exception => e # rubocop:disable Lint/RescueException Need to swallow all errors here callback(self, :log, "Error creating connection #{e.inspect}\n#{e.backtrace}") end |
#add_service(hostname = 'localhost', port = nil, parameters = {}) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
230 231 232 233 234 235 236 237 238 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 230 def add_service(hostname = 'localhost', port = nil, parameters = {}) hostname = 'localhost' if hostname.nil? || hostname.empty? service = TCPServer.new(hostname, port) parameters[:hostname] = hostname parameters[:port] = service.local_address.ip_port self.class.s_locker.synchronize { self.class.services[service] = parameters } callback(self, :log, "Started listening on #{hostname}:#{parameters[:port]}.") true end |
#callback(object, method, *args, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
creates an asynchronous call to a method, with an optional callback (shortcut)
159 160 161 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 159 def callback(object, method, *args, &block) push_event object.method(method), *args, &block end |
#clear_connections ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
clears closed connections from the stack
341 342 343 344 345 346 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 341 def clear_connections # Using a SymbolProc here does not work # rubocop:disable Style/SymbolProc self.class.c_locker.synchronize { self.class.io_connection_dic.delete_if { |c| c.closed? } } # rubocop:enable Style/SymbolProc end |
#connection(connection_id) ⇒ Object
302 303 304 305 306 307 308 309 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 302 def connection(connection_id) self.class.c_locker.synchronize do self.class.io_connection_dic.each_value do |v| return v[:handler] unless v[:handler].nil? || v[:handler].id != connection_id end end nil end |
#events? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Events (Callbacks) / Multi-tasking Platform returns true if there are any unhandled events
134 135 136 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 134 def events? self.class.e_locker.synchronize { !self.class.events.empty? } end |
#fire_event ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
event handling FIFO
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 165 def fire_event event = self.class.e_locker.synchronize { self.class.events.shift } return false unless event begin event[0].call(*event[1]) rescue OpenSSL::SSL::SSLError log('SSL Bump - SSL Certificate refused?') # rubocop:disable Lint/RescueException rescue Exception => e raise if e.is_a?(SignalException) || e.is_a?(SystemExit) end # rubocop:enable Lint/RescueException true end |
#get_data(io, connection_data) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
this code will be called when a socket recieves data.
44 45 46 47 48 49 50 51 52 53 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 44 def get_data(io, connection_data) data = io.recv_nonblock(1_048_576) # with maximum number of bytes to read at a time... raise 'Received a 0byte payload' if data.length.zero? # We're already in a callback so no need to invoke as a callback connection_data[:handler].receive_data(data) rescue StandardError => e log("Closing socket due to error - #{e}\n#{e.backtrace}") remove_connection(io) end |
#io_review ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Reactor
IO review code will review the connections and sockets it will accept new connections and react to socket input
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 188 def io_review self.class.io_locker.synchronize do return false unless self.class.events.empty? united = self.class.services.keys + self.class.io_connection_dic.keys return false if united.empty? io_r = IO.select(united, nil, united, 0.1) if io_r io_r[0].each do |io| if self.class.services[io] begin callback(self, :add_connection, io.accept_nonblock, self.class.services[io]) rescue Errno::EWOULDBLOCK # There's nothing to handle. Swallow the error rescue StandardError => e log(e.) end elsif self.class.io_connection_dic[io] callback(self, :get_data, io, self.class.io_connection_dic[io]) else log('what?!') remove_connection(io) self.class.services.delete(io) end end io_r[2].each do |io| (remove_connection(io) || self.class.services.delete(io)).close rescue # rubocop:disable Style/RescueStandardError # Swallow all errors true end end end callback self, :clear_connections true end |
#name ⇒ Object
37 38 39 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 37 def name 'TCPSRV' end |
#push_event(handler, *args, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
pushes an event to the event’s stack if a block is passed along, it will be used as a callback: the block will be called with the values returned by the handler’s ‘call` method.
141 142 143 144 145 146 147 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 141 def push_event(handler, *args, &block) if block self.class.e_locker.synchronize { self.class.events << [proc { |a| push_event block, handler.call(*a) }, args] } else self.class.e_locker.synchronize { self.class.events << [handler, args] } end end |
#remove_connection(io) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 312 def remove_connection(io) # This needs to be synchronous begin self.class.io_connection_dic[io][:handler].unbind rescue e # Any errors when unbinding the handler should NOT stop the underlying socket # from being closed log("Error unbinding #{e.inspect}\n#{e.backtrace}") end connection_count = 0 self.class.c_locker.synchronize do self.class.io_connection_dic.delete io connection_count = self.class.io_connection_dic.count begin io.close rescue # rubocop:disable Style/RescueStandardError # Swallow all errors true end end return unless connection_count.zero? && ![:stop_on_client_exit].nil? && [:stop_on_client_exit] callback(self, :log, 'All clients have disconnected. Shutting down server.') callback(self, :stop_services) end |
#remove_connection_async(io) ⇒ Object
268 269 270 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 268 def remove_connection_async(io) callback(self, :remove_connection, io) end |
#run_async(*args, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Runs the block asynchronously by pushing it as an event to the event’s stack
152 153 154 155 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 152 def run_async(*args, &block) self.class.e_locker.synchronize { self.class.events << [block, args] } if block !block.nil? end |
#start ⇒ Object
main loop and activation code
This will create a thread pool and set them running.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 60 def start # prepare threads max_threads = [:max_threads] || 2 exit_flag = false threads = [] thread_cycle = proc do begin io_review rescue # rubocop:disable Style/RescueStandardError # Swallow all errors false end true while fire_event end max_threads.times { Thread.new { thread_cycle.call until exit_flag } } log('Services running. Press ^C to stop') # sleep until trap raises exception (cycling might cause the main thread to loose signals that might be caught inside rescue clauses) kill_timer = [:connection_timeout] kill_timer = -1 if kill_timer.nil? || kill_timer < 1 log("Will stop the server in #{server_options[:connection_timeout]} seconds if no connection is made.") if kill_timer > 0 log('Will stop the server when client disconnects') if ![:stop_on_client_exit].nil? && [:stop_on_client_exit] # Output to STDOUT. This is required by clients so it knows the server is now running self.class.s_locker.synchronize do self.class.services.each_value do |service| $stdout.write("#{server_options[:servicename]} RUNNING #{service[:hostname]}:#{service[:port]}\n") end end $stdout.flush loop do begin sleep(1) # The kill_timer is used to stop the server if no clients have connected in X seconds # a value of 0 or less will not timeout. if kill_timer > 0 kill_timer -= 1 if kill_timer.zero? connection_count = 0 self.class.c_locker.synchronize { connection_count = self.class.io_connection_dic.count } if connection_count.zero? log("No connection has been received in #{server_options[:connection_timeout]} seconds. Shutting down server.") stop_services end end end rescue # rubocop:disable Style/RescueStandardError # Swallow all errors true end break if self.class.services.empty? end # start shutdown. exit_flag = true log('Started shutdown process. Press ^C to force quit.') # shut down listening sockets stop_services # disconnect active connections stop_connections # cycle down threads log('Waiting for workers to cycle down') threads.each { |t| t.join if t.alive? } # rundown any active events thread_cycle.call end |
#stop_all_services ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
254 255 256 257 258 259 260 261 262 263 264 265 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 254 def stop_all_services self.class.services.each do |s, p| begin s.close rescue # rubocop:disable Style/RescueStandardError # Swallow all errors true end log("Stopped listening on #{p[:hostname]}:#{p[:port]}") end self.class.services.clear end |
#stop_connections ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 276 def stop_connections self.class.c_locker.synchronize do self.class.io_connection_dic.each_key do |io| io.close rescue # rubocop:disable Style/RescueStandardError # Swallow all errors true end self.class.io_connection_dic.clear end end |
#stop_services(from_trap = false) ⇒ Object
241 242 243 244 245 246 247 248 249 250 251 |
# File 'lib/puppet_editor_services/server/tcp.rb', line 241 def stop_services(from_trap = false) log('Stopping services') if from_trap # synchronize is not allowed when called from a trap statement stop_all_services else self.class.s_locker.synchronize do stop_all_services end end end |