Class: Raktr
- Inherits:
-
Object
- Object
- Raktr
- Defined in:
- lib/raktr.rb,
lib/raktr/queue.rb,
lib/raktr/tasks.rb,
lib/raktr/global.rb,
lib/raktr/version.rb,
lib/raktr/iterator.rb,
lib/raktr/connection.rb,
lib/raktr/tasks/base.rb,
lib/raktr/tasks/delayed.rb,
lib/raktr/tasks/one_off.rb,
lib/raktr/connection/tls.rb,
lib/raktr/tasks/periodic.rb,
lib/raktr/tasks/persistent.rb,
lib/raktr/connection/callbacks.rb,
lib/raktr/connection/peer_info.rb
Overview
This file is part of the Raktr project and may be subject to
redistribution and commercial restrictions. Please see the Raktr
web site for more information on licensing and terms of use.
Direct Known Subclasses
Defined Under Namespace
Classes: Connection, Error, Global, Iterator, Queue, Tasks
Constant Summary collapse
- DEFAULT_OPTIONS =
{ select_timeout: 0.02, max_tick_interval: 0.02 }
- VERSION =
'0.1.0'
Instance Attribute Summary collapse
-
#connections ⇒ Array<Connection>
readonly
Attached connections.
-
#max_tick_interval ⇒ Integer?
Amount of time to wait for a connection.
-
#ticks ⇒ Integer
readonly
Amount of ticks.
Class Method Summary collapse
-
.global ⇒ Reactor
Lazy-loaded, globally accessible Reactor.
- .jruby? ⇒ Boolean
-
.stop ⇒ Object
Stops the global Reactor instance and destroys it.
- .supports_unix_sockets? ⇒ Boolean
Instance Method Summary collapse
- #at_interval(interval, &block) ⇒ Object
-
#attach(connection) ⇒ Object
Attaches a connection to the Reactor loop.
-
#attached?(connection) ⇒ Bool
‘true` if the connection is attached, `false` otherwise.
-
#connect(*args, &block) ⇒ Connection
Connects to a peer.
-
#create_iterator(list, concurrency = 1000) ⇒ Reactor::Iterator
New Reactor::Iterator with ‘self` as the scheduler.
-
#create_queue ⇒ Reactor::Queue
New Reactor::Queue with ‘self` as the scheduler.
- #delay(time, &block) ⇒ Object
-
#detach(connection) ⇒ Object
Detaches a connection from the Reactor loop.
-
#in_same_thread? ⇒ Bool
‘true` if the caller is in the same #thread as the reactor loop, `false` otherwise.
-
#initialize(options = {}) ⇒ Raktr
constructor
A new instance of Raktr.
-
#listen(*args, &block) ⇒ Connection
Listens for incoming connections.
- #next_tick(&block) ⇒ Object
- #on_error(&block) ⇒ Object
- #on_shutdown(&block) ⇒ Object
- #on_tick(&block) ⇒ Object
- #run(&block) ⇒ Object
-
#run_block(&block) ⇒ Object
Starts the Reactor loop, blocks the current #thread while the given ‘block` executes and then #stops it.
- #run_in_thread(&block) ⇒ Thread
-
#running? ⇒ Bool
‘true` if the Reactor is running, `false` otherwise.
- #schedule(&block) ⇒ Object
-
#stop ⇒ Object
Stops the Reactor loop as soon as possible.
-
#thread ⇒ Thread?
Thread of the loop, ‘nil` if not running.
-
#wait ⇒ Object
Waits for the Reactor to stop #running?.
Constructor Details
#initialize(options = {}) ⇒ Raktr
Returns a new instance of Raktr.
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/raktr.rb', line 128 def initialize( = {} ) = DEFAULT_OPTIONS.merge( ) @max_tick_interval = [:max_tick_interval] @select_timeout = [:select_timeout] # Socket => Connection @connections = {} @stop = false @ticks = 0 @thread = nil @tasks = Tasks.new @error_handlers = Tasks.new @shutdown_tasks = Tasks.new @done_signal = ::Queue.new end |
Instance Attribute Details
#connections ⇒ Array<Connection> (readonly)
Returns Attached connections.
74 75 76 |
# File 'lib/raktr.rb', line 74 def connections @connections end |
#max_tick_interval ⇒ Integer?
Returns Amount of time to wait for a connection.
70 71 72 |
# File 'lib/raktr.rb', line 70 def max_tick_interval @max_tick_interval end |
#ticks ⇒ Integer (readonly)
Returns Amount of ticks.
78 79 80 |
# File 'lib/raktr.rb', line 78 def ticks @ticks end |
Class Method Details
.global ⇒ Reactor
Returns Lazy-loaded, globally accessible Reactor.
89 90 91 |
# File 'lib/raktr.rb', line 89 def global @raktr ||= Global.instance end |
.jruby? ⇒ Boolean
116 117 118 |
# File 'lib/raktr.rb', line 116 def jruby? RUBY_PLATFORM == 'java' end |
.stop ⇒ Object
Stops the global Reactor instance and destroys it. The next call to global will return a new instance.
95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/raktr.rb', line 95 def stop return if !@raktr global.stop rescue Error::NotRunning # Admittedly not the cleanest solution, but that's the only way to # force a Singleton to re-initialize -- and we want the Singleton to # cleanly implement the pattern in a Thread-safe way. global.class.instance_variable_set(:@singleton__instance__, nil) @raktr = nil end |
.supports_unix_sockets? ⇒ Boolean
108 109 110 111 112 113 114 |
# File 'lib/raktr.rb', line 108 def supports_unix_sockets? return false if jruby? !!UNIXSocket rescue NameError false end |
Instance Method Details
#at_interval(interval, &block) ⇒ Object
Time accuracy cannot be guaranteed.
464 465 466 467 468 |
# File 'lib/raktr.rb', line 464 def at_interval( interval, &block ) fail_if_not_running @tasks << Tasks::Periodic.new( interval, &block ) nil end |
#attach(connection) ⇒ Object
Will call Raktr::Connection::Callbacks#on_attach.
Attaches a connection to the Reactor loop.
507 508 509 510 511 512 513 514 515 |
# File 'lib/raktr.rb', line 507 def attach( connection ) return if attached? connection schedule do connection.raktr = self @connections[connection.to_io] = connection connection.on_attach end end |
#attached?(connection) ⇒ Bool
Returns ‘true` if the connection is attached, `false` otherwise.
536 537 538 |
# File 'lib/raktr.rb', line 536 def attached?( connection ) @connections.include? connection.to_io end |
#connect(host, port, handler = Connection, *handler_options) ⇒ Connection #connect(unix_socket, handler = Connection, *handler_options) ⇒ Connection
Connection errors will be passed to the ‘handler`’s Raktr::Connection::Callbacks#on_close method as a ‘reason` argument.
Connects to a peer.
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/raktr.rb', line 188 def connect( *args, &block ) fail_if_not_running tls = nil if args.last.is_a?( Hash ) && args.last.include?( :tls ) tls = args.last[:tls] else tls = ( :server ) end = ( *args ) connection = [:handler].new( *[:handler_options] ) connection.raktr = self block.call connection if block_given? begin socket = [:unix_socket] ? connect_unix( [:unix_socket] ) : connect_tcp connection.configure .merge( socket: socket, tls: tls, role: :client ) attach connection rescue => e connection.close e raise end connection end |
#create_iterator(list, concurrency = 1000) ⇒ Reactor::Iterator
Returns New Reactor::Iterator with ‘self` as the scheduler.
152 153 154 |
# File 'lib/raktr.rb', line 152 def create_iterator( list, concurrency = 1000 ) Raktr::Iterator.new( self, list, concurrency ) end |
#create_queue ⇒ Reactor::Queue
Returns New Reactor::Queue with ‘self` as the scheduler.
158 159 160 |
# File 'lib/raktr.rb', line 158 def create_queue Raktr::Queue.new self end |
#delay(time, &block) ⇒ Object
Time accuracy cannot be guaranteed.
478 479 480 481 482 |
# File 'lib/raktr.rb', line 478 def delay( time, &block ) fail_if_not_running @tasks << Tasks::Delayed.new( time, &block ) nil end |
#detach(connection) ⇒ Object
Will call Raktr::Connection::Callbacks#on_detach.
Detaches a connection from the Reactor loop.
524 525 526 527 528 529 530 531 532 |
# File 'lib/raktr.rb', line 524 def detach( connection ) return if !attached?( connection ) schedule do connection.on_detach @connections.delete connection.to_io connection.raktr = nil end end |
#in_same_thread? ⇒ Bool
Returns ‘true` if the caller is in the same #thread as the reactor loop, `false` otherwise.
495 496 497 498 |
# File 'lib/raktr.rb', line 495 def in_same_thread? fail_if_not_running Thread.current == thread end |
#listen(host, port, handler = Connection, *handler_options) ⇒ Connection #listen(unix_socket, handler = Connection, *handler_options) ⇒ Connection
Connection errors will be passed to the ‘handler`’s Raktr::Connection::Callbacks#on_close method as a ‘reason` argument.
Listens for incoming connections.
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/raktr.rb', line 252 def listen( *args, &block ) fail_if_not_running tls = nil if args.last.is_a?( Hash ) && args.last.include?( :tls ) tls = args.last[:tls] else tls = ( :server ) end = ( *args ) server_handler = proc do c = [:handler].new( *[:handler_options] ) c.raktr = self block.call c if block_given? c end server = server_handler.call begin socket = [:unix_socket] ? listen_unix( [:unix_socket] ) : listen_tcp( [:host], [:port] ) server.configure .merge( socket: socket, tls: tls, role: :server, server_handler: server_handler ) attach server rescue => e server.close e raise end server end |
#next_tick(&block) ⇒ Object
450 451 452 453 454 |
# File 'lib/raktr.rb', line 450 def next_tick( &block ) fail_if_not_running @tasks << Tasks::OneOff.new( &block ) nil end |
#on_error(&block) ⇒ Object
402 403 404 405 406 |
# File 'lib/raktr.rb', line 402 def on_error( &block ) fail_if_not_running @error_handlers << Tasks::Persistent.new( &block ) nil end |
#on_shutdown(&block) ⇒ Object
440 441 442 443 444 |
# File 'lib/raktr.rb', line 440 def on_shutdown( &block ) fail_if_not_running @shutdown_tasks << Tasks::OneOff.new( &block ) nil end |
#on_tick(&block) ⇒ Object
412 413 414 415 416 |
# File 'lib/raktr.rb', line 412 def on_tick( &block ) fail_if_not_running @tasks << Tasks::Persistent.new( &block ) nil end |
#run(&block) ⇒ Object
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 |
# File 'lib/raktr.rb', line 308 def run( &block ) fail_if_running @done_signal.clear @thread = Thread.current block.call( self ) if block_given? loop do begin @tasks.call rescue => e @error_handlers.call( e ) end break if @stop begin process_connections rescue => e @error_handlers.call( e ) end break if @stop @ticks += 1 end @tasks.clear close_connections @shutdown_tasks.call @ticks = 0 @thread = nil @done_signal << nil end |
#run_block(&block) ⇒ Object
Starts the Reactor loop, blocks the current #thread while the given ‘block` executes and then #stops it.
387 388 389 390 391 392 393 394 395 |
# File 'lib/raktr.rb', line 387 def run_block( &block ) fail ArgumentError, 'Missing block.' if !block_given? fail_if_running run do block.call next_tick { stop } end end |
#run_in_thread(&block) ⇒ Thread
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 |
# File 'lib/raktr.rb', line 354 def run_in_thread( &block ) fail_if_running Thread.new do begin run(&block) rescue => e @error_handlers.call( e ) end end sleep 0.1 while !running? thread end |
#running? ⇒ Bool
Returns ‘true` if the Reactor is running, `false` otherwise.
290 291 292 |
# File 'lib/raktr.rb', line 290 def running? thread && thread.alive? end |
#schedule(&block) ⇒ Object
424 425 426 427 428 429 430 431 432 433 434 |
# File 'lib/raktr.rb', line 424 def schedule( &block ) fail_if_not_running if in_same_thread? block.call self else next_tick(&block) end nil end |
#stop ⇒ Object
Stops the Reactor loop as soon as possible.
297 298 299 |
# File 'lib/raktr.rb', line 297 def stop schedule { @stop = true } end |
#thread ⇒ Thread?
Returns Thread of the loop, ‘nil` if not running.
486 487 488 |
# File 'lib/raktr.rb', line 486 def thread @thread end |
#wait ⇒ Object
Waits for the Reactor to stop #running?.
373 374 375 376 377 378 |
# File 'lib/raktr.rb', line 373 def wait fail_if_not_running @done_signal.pop true end |