Class: Raktr
- Inherits:
-
Object
- Object
- Raktr
- Defined in:
- lib/raktr.rb,
lib/raktr/queue.rb,
lib/raktr/tasks.rb,
lib/raktr/global.rb,
lib/raktr/version.rb,
lib/raktr/iterator.rb,
lib/raktr/connection.rb,
lib/raktr/tasks/base.rb,
lib/raktr/tasks/delayed.rb,
lib/raktr/tasks/one_off.rb,
lib/raktr/connection/tls.rb,
lib/raktr/tasks/periodic.rb,
lib/raktr/tasks/persistent.rb,
lib/raktr/connection/callbacks.rb,
lib/raktr/connection/peer_info.rb
Overview
This file is part of the Raktr project and may be subject to
redistribution and commercial restrictions. Please see the Raktr
web site for more information on licensing and terms of use.
Direct Known Subclasses
Defined Under Namespace
Classes: Connection, Error, Global, Iterator, Queue, Tasks
Constant Summary collapse
- DEFAULT_OPTIONS =
{ select_timeout: 0.02, max_tick_interval: 0.02 }
- VERSION =
'0.0.3'
Instance Attribute Summary collapse
-
#connections ⇒ Array<Connection>
readonly
Attached connections.
-
#max_tick_interval ⇒ Integer?
Amount of time to wait for a connection.
-
#ticks ⇒ Integer
readonly
Amount of ticks.
Class Method Summary collapse
-
.global ⇒ Reactor
Lazy-loaded, globally accessible Reactor.
- .jruby? ⇒ Boolean
-
.stop ⇒ Object
Stops the global Reactor instance and destroys it.
- .supports_unix_sockets? ⇒ Boolean
Instance Method Summary collapse
- #at_interval(interval, &block) ⇒ Object
-
#attach(connection) ⇒ Object
Attaches a connection to the Reactor loop.
-
#attached?(connection) ⇒ Bool
‘true` if the connection is attached, `false` otherwise.
-
#connect(*args, &block) ⇒ Connection
Connects to a peer.
-
#create_iterator(list, concurrency = 1) ⇒ Reactor::Iterator
New Reactor::Iterator with ‘self` as the scheduler.
-
#create_queue ⇒ Reactor::Queue
New Reactor::Queue with ‘self` as the scheduler.
- #delay(time, &block) ⇒ Object
-
#detach(connection) ⇒ Object
Detaches a connection from the Reactor loop.
-
#in_same_thread? ⇒ Bool
‘true` if the caller is in the same #thread as the reactor loop, `false` otherwise.
-
#initialize(options = {}) ⇒ Raktr
constructor
A new instance of Raktr.
-
#listen(*args, &block) ⇒ Connection
Listens for incoming connections.
- #next_tick(&block) ⇒ Object
- #on_error(&block) ⇒ Object
- #on_shutdown(&block) ⇒ Object
- #on_tick(&block) ⇒ Object
- #run(&block) ⇒ Object
-
#run_block(&block) ⇒ Object
Starts the Reactor loop, blocks the current #thread while the given ‘block` executes and then #stops it.
- #run_in_thread(&block) ⇒ Thread
-
#running? ⇒ Bool
‘true` if the Reactor is running, `false` otherwise.
- #schedule(&block) ⇒ Object
-
#stop ⇒ Object
Stops the Reactor loop as soon as possible.
-
#thread ⇒ Thread?
Thread of the loop, ‘nil` if not running.
-
#wait ⇒ Object
Waits for the Reactor to stop #running?.
Constructor Details
#initialize(options = {}) ⇒ Raktr
Returns a new instance of Raktr.
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/raktr.rb', line 128 def initialize( = {} ) = DEFAULT_OPTIONS.merge( ) @max_tick_interval = [:max_tick_interval] @select_timeout = [:select_timeout] # Socket => Connection @connections = {} @stop = false @ticks = 0 @thread = nil @tasks = Tasks.new @error_handlers = Tasks.new @shutdown_tasks = Tasks.new @done_signal = ::Queue.new end |
Instance Attribute Details
#connections ⇒ Array<Connection> (readonly)
74 75 76 |
# File 'lib/raktr.rb', line 74 def connections @connections end |
#max_tick_interval ⇒ Integer?
70 71 72 |
# File 'lib/raktr.rb', line 70 def max_tick_interval @max_tick_interval end |
#ticks ⇒ Integer (readonly)
78 79 80 |
# File 'lib/raktr.rb', line 78 def ticks @ticks end |
Class Method Details
.global ⇒ Reactor
89 90 91 |
# File 'lib/raktr.rb', line 89 def global @raktr ||= Global.instance end |
.jruby? ⇒ Boolean
116 117 118 |
# File 'lib/raktr.rb', line 116 def jruby? RUBY_PLATFORM == 'java' end |
.stop ⇒ Object
Stops the global Reactor instance and destroys it. The next call to global will return a new instance.
95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/raktr.rb', line 95 def stop return if !@raktr global.stop rescue Error::NotRunning # Admittedly not the cleanest solution, but that's the only way to # force a Singleton to re-initialize -- and we want the Singleton to # cleanly implement the pattern in a Thread-safe way. global.class.instance_variable_set(:@singleton__instance__, nil) @raktr = nil end |
.supports_unix_sockets? ⇒ Boolean
108 109 110 111 112 113 114 |
# File 'lib/raktr.rb', line 108 def supports_unix_sockets? return false if jruby? !!UNIXSocket rescue NameError false end |
Instance Method Details
#at_interval(interval, &block) ⇒ Object
Time accuracy cannot be guaranteed.
450 451 452 453 454 |
# File 'lib/raktr.rb', line 450 def at_interval( interval, &block ) fail_if_not_running @tasks << Tasks::Periodic.new( interval, &block ) nil end |
#attach(connection) ⇒ Object
Will call Raktr::Connection::Callbacks#on_attach.
Attaches a connection to the Reactor loop.
493 494 495 496 497 498 499 500 501 |
# File 'lib/raktr.rb', line 493 def attach( connection ) return if attached? connection schedule do connection.raktr = self @connections[connection.to_io] = connection connection.on_attach end end |
#attached?(connection) ⇒ Bool
522 523 524 |
# File 'lib/raktr.rb', line 522 def attached?( connection ) @connections.include? connection.to_io end |
#connect(host, port, handler = Connection, *handler_options) ⇒ Connection #connect(unix_socket, handler = Connection, *handler_options) ⇒ Connection
Connection errors will be passed to the ‘handler`’s Raktr::Connection::Callbacks#on_close method as a ‘reason` argument.
Connects to a peer.
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/raktr.rb', line 188 def connect( *args, &block ) fail_if_not_running = ( *args ) connection = [:handler].new( *[:handler_options] ) connection.raktr = self block.call connection if block_given? begin socket = [:unix_socket] ? connect_unix( [:unix_socket] ) : connect_tcp connection.configure .merge( socket: socket, role: :client ) attach connection rescue => e connection.close e raise end connection end |
#create_iterator(list, concurrency = 1) ⇒ Reactor::Iterator
152 153 154 |
# File 'lib/raktr.rb', line 152 def create_iterator( list, concurrency = 1 ) Raktr::Iterator.new( self, list, concurrency ) end |
#create_queue ⇒ Reactor::Queue
158 159 160 |
# File 'lib/raktr.rb', line 158 def create_queue Raktr::Queue.new self end |
#delay(time, &block) ⇒ Object
Time accuracy cannot be guaranteed.
464 465 466 467 468 |
# File 'lib/raktr.rb', line 464 def delay( time, &block ) fail_if_not_running @tasks << Tasks::Delayed.new( time, &block ) nil end |
#detach(connection) ⇒ Object
Will call Raktr::Connection::Callbacks#on_detach.
Detaches a connection from the Reactor loop.
510 511 512 513 514 515 516 517 518 |
# File 'lib/raktr.rb', line 510 def detach( connection ) return if !attached?( connection ) schedule do connection.on_detach @connections.delete connection.to_io connection.raktr = nil end end |
#in_same_thread? ⇒ Bool
Returns ‘true` if the caller is in the same #thread as the reactor loop, `false` otherwise.
481 482 483 484 |
# File 'lib/raktr.rb', line 481 def in_same_thread? fail_if_not_running Thread.current == thread end |
#listen(host, port, handler = Connection, *handler_options) ⇒ Connection #listen(unix_socket, handler = Connection, *handler_options) ⇒ Connection
Connection errors will be passed to the ‘handler`’s Raktr::Connection::Callbacks#on_close method as a ‘reason` argument.
Listens for incoming connections.
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/raktr.rb', line 245 def listen( *args, &block ) fail_if_not_running = ( *args ) server_handler = proc do c = [:handler].new( *[:handler_options] ) c.raktr = self block.call c if block_given? c end server = server_handler.call begin socket = [:unix_socket] ? listen_unix( [:unix_socket] ) : listen_tcp( [:host], [:port] ) server.configure .merge( socket: socket, role: :server, server_handler: server_handler ) attach server rescue => e server.close e raise end server end |
#next_tick(&block) ⇒ Object
436 437 438 439 440 |
# File 'lib/raktr.rb', line 436 def next_tick( &block ) fail_if_not_running @tasks << Tasks::OneOff.new( &block ) nil end |
#on_error(&block) ⇒ Object
388 389 390 391 392 |
# File 'lib/raktr.rb', line 388 def on_error( &block ) fail_if_not_running @error_handlers << Tasks::Persistent.new( &block ) nil end |
#on_shutdown(&block) ⇒ Object
426 427 428 429 430 |
# File 'lib/raktr.rb', line 426 def on_shutdown( &block ) fail_if_not_running @shutdown_tasks << Tasks::OneOff.new( &block ) nil end |
#on_tick(&block) ⇒ Object
398 399 400 401 402 |
# File 'lib/raktr.rb', line 398 def on_tick( &block ) fail_if_not_running @tasks << Tasks::Persistent.new( &block ) nil end |
#run(&block) ⇒ Object
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 |
# File 'lib/raktr.rb', line 294 def run( &block ) fail_if_running @done_signal.clear @thread = Thread.current block.call( self ) if block_given? loop do begin @tasks.call rescue => e @error_handlers.call( e ) end break if @stop begin process_connections rescue => e @error_handlers.call( e ) end break if @stop @ticks += 1 end @tasks.clear close_connections @shutdown_tasks.call @ticks = 0 @thread = nil @done_signal << nil end |
#run_block(&block) ⇒ Object
Starts the Reactor loop, blocks the current #thread while the given ‘block` executes and then #stops it.
373 374 375 376 377 378 379 380 381 |
# File 'lib/raktr.rb', line 373 def run_block( &block ) fail ArgumentError, 'Missing block.' if !block_given? fail_if_running run do block.call next_tick { stop } end end |
#run_in_thread(&block) ⇒ Thread
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 |
# File 'lib/raktr.rb', line 340 def run_in_thread( &block ) fail_if_running Thread.new do begin run(&block) rescue => e @error_handlers.call( e ) end end sleep 0.1 while !running? thread end |
#running? ⇒ Bool
276 277 278 |
# File 'lib/raktr.rb', line 276 def running? thread && thread.alive? end |
#schedule(&block) ⇒ Object
410 411 412 413 414 415 416 417 418 419 420 |
# File 'lib/raktr.rb', line 410 def schedule( &block ) fail_if_not_running if in_same_thread? block.call self else next_tick(&block) end nil end |
#stop ⇒ Object
Stops the Reactor loop as soon as possible.
283 284 285 |
# File 'lib/raktr.rb', line 283 def stop schedule { @stop = true } end |
#thread ⇒ Thread?
472 473 474 |
# File 'lib/raktr.rb', line 472 def thread @thread end |
#wait ⇒ Object
Waits for the Reactor to stop #running?.
359 360 361 362 363 364 |
# File 'lib/raktr.rb', line 359 def wait fail_if_not_running @done_signal.pop true end |