Class: Raktr

Inherits:
Object
  • Object
show all
Defined in:
lib/raktr.rb,
lib/raktr/queue.rb,
lib/raktr/tasks.rb,
lib/raktr/global.rb,
lib/raktr/version.rb,
lib/raktr/iterator.rb,
lib/raktr/connection.rb,
lib/raktr/tasks/base.rb,
lib/raktr/tasks/delayed.rb,
lib/raktr/tasks/one_off.rb,
lib/raktr/connection/tls.rb,
lib/raktr/tasks/periodic.rb,
lib/raktr/tasks/persistent.rb,
lib/raktr/connection/callbacks.rb,
lib/raktr/connection/peer_info.rb

Overview

This file is part of the Raktr project and may be subject to

redistribution and commercial restrictions. Please see the Raktr
web site for more information on licensing and terms of use.

Direct Known Subclasses

Global

Defined Under Namespace

Classes: Connection, Error, Global, Iterator, Queue, Tasks

Constant Summary collapse

DEFAULT_OPTIONS =
{
    select_timeout:    0.02,
    max_tick_interval: 0.02
}
VERSION =
'0.0.3'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Raktr

Returns a new instance of Raktr.

Parameters:

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :max_tick_interval (Integer, nil) — default: 0.02

    How long to wait for each tick when no connections are available for processing.

  • :select_timeout (Integer) — default: 0.02

    How long to wait for connection activity before continuing to the next tick.



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/raktr.rb', line 128

def initialize( options = {} )
    options = DEFAULT_OPTIONS.merge( options )

    @max_tick_interval = options[:max_tick_interval]
    @select_timeout    = options[:select_timeout]

    # Socket => Connection
    @connections = {}
    @stop        = false
    @ticks       = 0
    @thread      = nil
    @tasks       = Tasks.new

    @error_handlers = Tasks.new
    @shutdown_tasks = Tasks.new
    @done_signal    = ::Queue.new
end

Instance Attribute Details

#connectionsArray<Connection> (readonly)

Returns Attached connections.

Returns:



74
75
76
# File 'lib/raktr.rb', line 74

def connections
  @connections
end

#max_tick_intervalInteger?

Returns Amount of time to wait for a connection.

Returns:

  • (Integer, nil)

    Amount of time to wait for a connection.



70
71
72
# File 'lib/raktr.rb', line 70

def max_tick_interval
  @max_tick_interval
end

#ticksInteger (readonly)

Returns Amount of ticks.

Returns:

  • (Integer)

    Amount of ticks.



78
79
80
# File 'lib/raktr.rb', line 78

def ticks
  @ticks
end

Class Method Details

.globalReactor

Returns Lazy-loaded, globally accessible Reactor.

Returns:

  • (Reactor)

    Lazy-loaded, globally accessible Reactor.



89
90
91
# File 'lib/raktr.rb', line 89

def global
    @raktr ||= Global.instance
end

.jruby?Boolean

Returns:

  • (Boolean)


116
117
118
# File 'lib/raktr.rb', line 116

def jruby?
    RUBY_PLATFORM == 'java'
end

.stopObject

Stops the global Reactor instance and destroys it. The next call to global will return a new instance.



95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/raktr.rb', line 95

def stop
    return if !@raktr

    global.stop rescue Error::NotRunning

    # Admittedly not the cleanest solution, but that's the only way to
    # force a Singleton to re-initialize -- and we want the Singleton to
    # cleanly implement the pattern in a Thread-safe way.
    global.class.instance_variable_set(:@singleton__instance__, nil)

    @raktr = nil
end

.supports_unix_sockets?Boolean

Returns:

  • (Boolean)


108
109
110
111
112
113
114
# File 'lib/raktr.rb', line 108

def supports_unix_sockets?
    return false if jruby?

    !!UNIXSocket
rescue NameError
    false
end

Instance Method Details

#at_interval(interval, &block) ⇒ Object

Note:

Time accuracy cannot be guaranteed.

Parameters:

  • interval (Float)

    Time in seconds.

  • block (Block)

    Schedules a task to be run at every ‘interval` seconds.

Raises:



450
451
452
453
454
# File 'lib/raktr.rb', line 450

def at_interval( interval, &block )
    fail_if_not_running
    @tasks << Tasks::Periodic.new( interval, &block )
    nil
end

#attach(connection) ⇒ Object

Attaches a connection to the Reactor loop.

Parameters:

Raises:



493
494
495
496
497
498
499
500
501
# File 'lib/raktr.rb', line 493

def attach( connection )
    return if attached? connection

    schedule do
        connection.raktr = self
        @connections[connection.to_io] = connection
        connection.on_attach
    end
end

#attached?(connection) ⇒ Bool

Returns ‘true` if the connection is attached, `false` otherwise.

Returns:

  • (Bool)

    ‘true` if the connection is attached, `false` otherwise.



522
523
524
# File 'lib/raktr.rb', line 522

def attached?( connection )
    @connections.include? connection.to_io
end

#connect(host, port, handler = Connection, *handler_options) ⇒ Connection #connect(unix_socket, handler = Connection, *handler_options) ⇒ Connection

Note:

Connection errors will be passed to the ‘handler`’s Raktr::Connection::Callbacks#on_close method as a ‘reason` argument.

Connects to a peer.

Overloads:

  • #connect(host, port, handler = Connection, *handler_options) ⇒ Connection

    Parameters:

    • host (String)
    • port (Integer)
    • handler (Connection) (defaults to: Connection)

      Connection handler, should be a subclass of Connection.

    • handler_options (Hash)

      Options to pass to the ‘#initialize` method of the `handler`.

  • #connect(unix_socket, handler = Connection, *handler_options) ⇒ Connection

    Parameters:

    • unix_socket (String)

      Path to the UNIX socket to connect.

    • handler (Connection) (defaults to: Connection)

      Connection handler, should be a subclass of Connection.

    • handler_options (Hash)

      Options to pass to the ‘#initialize` method of the `handler`.

Returns:

  • (Connection)

    Connected instance of ‘handler`.

Raises:



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/raktr.rb', line 188

def connect( *args, &block )
    fail_if_not_running

    options = determine_connection_options( *args )

    connection = options[:handler].new( *options[:handler_options] )
    connection.raktr = self
    block.call connection if block_given?

    begin
        socket = options[:unix_socket] ?
            connect_unix( options[:unix_socket] ) : connect_tcp

        connection.configure options.merge( socket: socket, role: :client )
        attach connection
    rescue => e
        connection.close e
        raise
    end

    connection
end

#create_iterator(list, concurrency = 1) ⇒ Reactor::Iterator

Returns New Reactor::Iterator with ‘self` as the scheduler.

Parameters:

  • list (#to_a)

    List to iterate.

  • concurrency (Integer) (defaults to: 1)

    Parallel workers to spawn.

Returns:

  • (Reactor::Iterator)

    New Reactor::Iterator with ‘self` as the scheduler.



152
153
154
# File 'lib/raktr.rb', line 152

def create_iterator( list, concurrency = 1 )
    Raktr::Iterator.new( self, list, concurrency )
end

#create_queueReactor::Queue

Returns New Reactor::Queue with ‘self` as the scheduler.

Returns:

  • (Reactor::Queue)

    New Reactor::Queue with ‘self` as the scheduler.



158
159
160
# File 'lib/raktr.rb', line 158

def create_queue
    Raktr::Queue.new self
end

#delay(time, &block) ⇒ Object

Note:

Time accuracy cannot be guaranteed.

Parameters:

  • time (Float)

    Time in seconds.

  • block (Block)

    Schedules a task to be run in ‘time` seconds.

Raises:



464
465
466
467
468
# File 'lib/raktr.rb', line 464

def delay( time, &block )
    fail_if_not_running
    @tasks << Tasks::Delayed.new( time, &block )
    nil
end

#detach(connection) ⇒ Object

Detaches a connection from the Reactor loop.

Parameters:

Raises:



510
511
512
513
514
515
516
517
518
# File 'lib/raktr.rb', line 510

def detach( connection )
    return if !attached?( connection )

    schedule do
        connection.on_detach
        @connections.delete connection.to_io
        connection.raktr = nil
    end
end

#in_same_thread?Bool

Returns ‘true` if the caller is in the same #thread as the reactor loop, `false` otherwise.

Returns:

  • (Bool)

    ‘true` if the caller is in the same #thread as the reactor loop, `false` otherwise.

Raises:



481
482
483
484
# File 'lib/raktr.rb', line 481

def in_same_thread?
    fail_if_not_running
    Thread.current == thread
end

#listen(host, port, handler = Connection, *handler_options) ⇒ Connection #listen(unix_socket, handler = Connection, *handler_options) ⇒ Connection

Note:

Connection errors will be passed to the ‘handler`’s Raktr::Connection::Callbacks#on_close method as a ‘reason` argument.

Listens for incoming connections.

Overloads:

  • #listen(host, port, handler = Connection, *handler_options) ⇒ Connection

    Parameters:

    • host (String)
    • port (Integer)
    • handler (Connection) (defaults to: Connection)

      Connection handler, should be a subclass of Connection.

    • handler_options (Hash)

      Options to pass to the ‘#initialize` method of the `handler`.

    Raises:

    • (Connection::Error::HostNotFound)

      If the ‘host` is invalid.

    • (Connection::Error::Permission)

      If the ‘port` could not be opened due to a permission error.

  • #listen(unix_socket, handler = Connection, *handler_options) ⇒ Connection

    Parameters:

    • unix_socket (String)

      Path to the UNIX socket to create.

    • handler (Connection) (defaults to: Connection)

      Connection handler, should be a subclass of Connection.

    • handler_options (Hash)

      Options to pass to the ‘#initialize` method of the `handler`.

    Raises:

    • (Connection::Error::Permission)

      If the ‘unix_socket` file could not be created due to a permission error.

Returns:

  • (Connection)

    Listening instance of ‘handler`.

Raises:



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/raktr.rb', line 245

def listen( *args, &block )
    fail_if_not_running

    options = determine_connection_options( *args )

    server_handler = proc do
        c = options[:handler].new( *options[:handler_options] )
        c.raktr = self
        block.call c if block_given?
        c
    end

    server = server_handler.call

    begin
        socket = options[:unix_socket] ?
            listen_unix( options[:unix_socket] ) :
            listen_tcp( options[:host], options[:port] )

        server.configure options.merge( socket: socket, role: :server, server_handler: server_handler )
        attach server
    rescue => e
        server.close e
        raise
    end

    server
end

#next_tick(&block) ⇒ Object

Parameters:

  • block (Block)

    Schedules a task to be run at the next tick.

Raises:



436
437
438
439
440
# File 'lib/raktr.rb', line 436

def next_tick( &block )
    fail_if_not_running
    @tasks << Tasks::OneOff.new( &block )
    nil
end

#on_error(&block) ⇒ Object

Parameters:

  • block (Block)

    Passes exceptions raised in the Reactor #thread to a task.

Raises:



388
389
390
391
392
# File 'lib/raktr.rb', line 388

def on_error( &block )
    fail_if_not_running
    @error_handlers << Tasks::Persistent.new( &block )
    nil
end

#on_shutdown(&block) ⇒ Object

Parameters:

Raises:



426
427
428
429
430
# File 'lib/raktr.rb', line 426

def on_shutdown( &block )
    fail_if_not_running
    @shutdown_tasks << Tasks::OneOff.new( &block )
    nil
end

#on_tick(&block) ⇒ Object

Parameters:

  • block (Block)

    Schedules a task to be run at each tick.

Raises:



398
399
400
401
402
# File 'lib/raktr.rb', line 398

def on_tick( &block )
    fail_if_not_running
    @tasks << Tasks::Persistent.new( &block )
    nil
end

#run(&block) ⇒ Object

Starts the Reactor loop and blocks the current #thread until #stop is called.

Parameters:

  • block (Block)

    Block to call right before initializing the loop.

Raises:



294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# File 'lib/raktr.rb', line 294

def run( &block )
    fail_if_running

    @done_signal.clear

    @thread = Thread.current

    block.call( self ) if block_given?

    loop do
        begin
            @tasks.call
        rescue => e
            @error_handlers.call( e )
        end
        break if @stop

        begin
            process_connections
        rescue => e
            @error_handlers.call( e )
        end
        break if @stop

        @ticks += 1
    end

    @tasks.clear
    close_connections

    @shutdown_tasks.call

    @ticks  = 0
    @thread = nil

    @done_signal << nil
end

#run_block(&block) ⇒ Object

Starts the Reactor loop, blocks the current #thread while the given ‘block` executes and then #stops it.

Parameters:

  • block (Block)

    Block to call.

Raises:



373
374
375
376
377
378
379
380
381
# File 'lib/raktr.rb', line 373

def run_block( &block )
    fail ArgumentError, 'Missing block.' if !block_given?
    fail_if_running

    run do
        block.call
        next_tick { stop }
    end
end

#run_in_thread(&block) ⇒ Thread

Runs the Reactor in a thread and blocks until it is #running?.

Parameters:

  • block (Block)

    Block to call right before initializing the loop.

Returns:

  • (Thread)

    Reactor#thread

Raises:



340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# File 'lib/raktr.rb', line 340

def run_in_thread( &block )
    fail_if_running

    Thread.new do
        begin
            run(&block)
        rescue => e
            @error_handlers.call( e )
        end
    end

    sleep 0.1 while !running?

    thread
end

#running?Bool

Returns ‘true` if the Reactor is running, `false` otherwise.

Returns:

  • (Bool)

    ‘true` if the Reactor is running, `false` otherwise.



276
277
278
# File 'lib/raktr.rb', line 276

def running?
    thread && thread.alive?
end

#schedule(&block) ⇒ Object

Parameters:

  • block (Block)

    Schedules a task to be run as soon as possible, either immediately if the caller is in the same thread, or at the #next_tick otherwise.

Raises:



410
411
412
413
414
415
416
417
418
419
420
# File 'lib/raktr.rb', line 410

def schedule( &block )
    fail_if_not_running

    if in_same_thread?
        block.call self
    else
        next_tick(&block)
    end

    nil
end

#stopObject

Stops the Reactor loop as soon as possible.

Raises:



283
284
285
# File 'lib/raktr.rb', line 283

def stop
    schedule { @stop = true }
end

#threadThread?

Returns Thread of the loop, ‘nil` if not running.

Returns:

  • (Thread, nil)

    Thread of the loop, ‘nil` if not running.



472
473
474
# File 'lib/raktr.rb', line 472

def thread
    @thread
end

#waitObject

Waits for the Reactor to stop #running?.

Raises:



359
360
361
362
363
364
# File 'lib/raktr.rb', line 359

def wait
    fail_if_not_running

    @done_signal.pop
    true
end