Class: Raktr

Inherits:
Object
  • Object
show all
Defined in:
lib/raktr.rb,
lib/raktr/queue.rb,
lib/raktr/tasks.rb,
lib/raktr/global.rb,
lib/raktr/version.rb,
lib/raktr/iterator.rb,
lib/raktr/connection.rb,
lib/raktr/tasks/base.rb,
lib/raktr/tasks/delayed.rb,
lib/raktr/tasks/one_off.rb,
lib/raktr/connection/tls.rb,
lib/raktr/tasks/periodic.rb,
lib/raktr/tasks/persistent.rb,
lib/raktr/connection/callbacks.rb,
lib/raktr/connection/peer_info.rb

Overview

This file is part of the Raktr project and may be subject to

redistribution and commercial restrictions. Please see the Raktr
web site for more information on licensing and terms of use.

Direct Known Subclasses

Global

Defined Under Namespace

Classes: Connection, Error, Global, Iterator, Queue, Tasks

Constant Summary collapse

DEFAULT_OPTIONS =
{
    select_timeout:    0.02,
    max_tick_interval: 0.02
}
VERSION =
'0.0.2'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Raktr

Returns a new instance of Raktr.

Parameters:

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :max_tick_interval (Integer, nil) — default: 0.02

    How long to wait for each tick when no connections are available for processing.

  • :select_timeout (Integer) — default: 0.02

    How long to wait for connection activity before continuing to the next tick.



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/raktr.rb', line 128

def initialize( options = {} )
    options = DEFAULT_OPTIONS.merge( options )

    @max_tick_interval = options[:max_tick_interval]
    @select_timeout    = options[:select_timeout]

    # Socket => Connection
    @connections = {}
    @stop        = false
    @ticks       = 0
    @thread      = nil
    @tasks       = Tasks.new

    @error_handlers = Tasks.new
    @shutdown_tasks = Tasks.new
    @done_signal    = ::Queue.new
end

Instance Attribute Details

#connectionsArray<Connection> (readonly)

Returns Attached connections.

Returns:



74
75
76
# File 'lib/raktr.rb', line 74

def connections
  @connections
end

#max_tick_intervalInteger?

Returns Amount of time to wait for a connection.

Returns:

  • (Integer, nil)

    Amount of time to wait for a connection.



70
71
72
# File 'lib/raktr.rb', line 70

def max_tick_interval
  @max_tick_interval
end

#ticksInteger (readonly)

Returns Amount of ticks.

Returns:

  • (Integer)

    Amount of ticks.



78
79
80
# File 'lib/raktr.rb', line 78

def ticks
  @ticks
end

Class Method Details

.globalReactor

Returns Lazy-loaded, globally accessible Reactor.

Returns:

  • (Reactor)

    Lazy-loaded, globally accessible Reactor.



89
90
91
# File 'lib/raktr.rb', line 89

def global
    @raktr ||= Global.instance
end

.jruby?Boolean

Returns:

  • (Boolean)


116
117
118
# File 'lib/raktr.rb', line 116

def jruby?
    RUBY_PLATFORM == 'java'
end

.stopObject

Stops the global Reactor instance and destroys it. The next call to global will return a new instance.



95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/raktr.rb', line 95

def stop
    return if !@raktr

    global.stop rescue Error::NotRunning

    # Admittedly not the cleanest solution, but that's the only way to
    # force a Singleton to re-initialize -- and we want the Singleton to
    # cleanly implement the pattern in a Thread-safe way.
    global.class.instance_variable_set(:@singleton__instance__, nil)

    @raktr = nil
end

.supports_unix_sockets?Boolean

Returns:

  • (Boolean)


108
109
110
111
112
113
114
# File 'lib/raktr.rb', line 108

def supports_unix_sockets?
    return false if jruby?

    !!UNIXSocket
rescue NameError
    false
end

Instance Method Details

#at_interval(interval, &block) ⇒ Object

Note:

Time accuracy cannot be guaranteed.

Parameters:

  • interval (Float)

    Time in seconds.

  • block (Block)

    Schedules a task to be run at every ‘interval` seconds.

Raises:



448
449
450
451
452
# File 'lib/raktr.rb', line 448

def at_interval( interval, &block )
    fail_if_not_running
    @tasks << Tasks::Periodic.new( interval, &block )
    nil
end

#attach(connection) ⇒ Object

Attaches a connection to the Reactor loop.

Parameters:

Raises:



491
492
493
494
495
496
497
498
499
# File 'lib/raktr.rb', line 491

def attach( connection )
    return if attached? connection

    schedule do
        connection.raktr = self
        @connections[connection.to_io] = connection
        connection.on_attach
    end
end

#attached?(connection) ⇒ Bool

Returns ‘true` if the connection is attached, `false` otherwise.

Returns:

  • (Bool)

    ‘true` if the connection is attached, `false` otherwise.



520
521
522
# File 'lib/raktr.rb', line 520

def attached?( connection )
    @connections.include? connection.to_io
end

#connect(host, port, handler = Connection, *handler_options) ⇒ Connection #connect(unix_socket, handler = Connection, *handler_options) ⇒ Connection

Note:

Connection errors will be passed to the ‘handler`’s Raktr::Connection::Callbacks#on_close method as a ‘reason` argument.

Connects to a peer.

Overloads:

  • #connect(host, port, handler = Connection, *handler_options) ⇒ Connection

    Parameters:

    • host (String)
    • port (Integer)
    • handler (Connection) (defaults to: Connection)

      Connection handler, should be a subclass of Connection.

    • handler_options (Hash)

      Options to pass to the ‘#initialize` method of the `handler`.

  • #connect(unix_socket, handler = Connection, *handler_options) ⇒ Connection

    Parameters:

    • unix_socket (String)

      Path to the UNIX socket to connect.

    • handler (Connection) (defaults to: Connection)

      Connection handler, should be a subclass of Connection.

    • handler_options (Hash)

      Options to pass to the ‘#initialize` method of the `handler`.

Returns:

  • (Connection)

    Connected instance of ‘handler`.

Raises:



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/raktr.rb', line 188

def connect( *args, &block )
    fail_if_not_running

    options = determine_connection_options( *args )

    connection = options[:handler].new( *options[:handler_options] )
    connection.raktr = self
    block.call connection if block_given?

    begin
        socket = options[:unix_socket] ?
            connect_unix( options[:unix_socket] ) : connect_tcp

        connection.configure options.merge( socket: socket, role: :client )
        attach connection
    rescue => e
        connection.close e
    end

    connection
end

#create_iterator(list, concurrency = 1) ⇒ Reactor::Iterator

Returns New Reactor::Iterator with ‘self` as the scheduler.

Parameters:

  • list (#to_a)

    List to iterate.

  • concurrency (Integer) (defaults to: 1)

    Parallel workers to spawn.

Returns:

  • (Reactor::Iterator)

    New Reactor::Iterator with ‘self` as the scheduler.



152
153
154
# File 'lib/raktr.rb', line 152

def create_iterator( list, concurrency = 1 )
    Raktr::Iterator.new( self, list, concurrency )
end

#create_queueReactor::Queue

Returns New Reactor::Queue with ‘self` as the scheduler.

Returns:

  • (Reactor::Queue)

    New Reactor::Queue with ‘self` as the scheduler.



158
159
160
# File 'lib/raktr.rb', line 158

def create_queue
    Raktr::Queue.new self
end

#delay(time, &block) ⇒ Object

Note:

Time accuracy cannot be guaranteed.

Parameters:

  • time (Float)

    Time in seconds.

  • block (Block)

    Schedules a task to be run in ‘time` seconds.

Raises:



462
463
464
465
466
# File 'lib/raktr.rb', line 462

def delay( time, &block )
    fail_if_not_running
    @tasks << Tasks::Delayed.new( time, &block )
    nil
end

#detach(connection) ⇒ Object

Detaches a connection from the Reactor loop.

Parameters:

Raises:



508
509
510
511
512
513
514
515
516
# File 'lib/raktr.rb', line 508

def detach( connection )
    return if !attached?( connection )

    schedule do
        connection.on_detach
        @connections.delete connection.to_io
        connection.raktr = nil
    end
end

#in_same_thread?Bool

Returns ‘true` if the caller is in the same #thread as the reactor loop, `false` otherwise.

Returns:

  • (Bool)

    ‘true` if the caller is in the same #thread as the reactor loop, `false` otherwise.

Raises:



479
480
481
482
# File 'lib/raktr.rb', line 479

def in_same_thread?
    fail_if_not_running
    Thread.current == thread
end

#listen(host, port, handler = Connection, *handler_options) ⇒ Connection #listen(unix_socket, handler = Connection, *handler_options) ⇒ Connection

Note:

Connection errors will be passed to the ‘handler`’s Raktr::Connection::Callbacks#on_close method as a ‘reason` argument.

Listens for incoming connections.

Overloads:

  • #listen(host, port, handler = Connection, *handler_options) ⇒ Connection

    Parameters:

    • host (String)
    • port (Integer)
    • handler (Connection) (defaults to: Connection)

      Connection handler, should be a subclass of Connection.

    • handler_options (Hash)

      Options to pass to the ‘#initialize` method of the `handler`.

    Raises:

    • (Connection::Error::HostNotFound)

      If the ‘host` is invalid.

    • (Connection::Error::Permission)

      If the ‘port` could not be opened due to a permission error.

  • #listen(unix_socket, handler = Connection, *handler_options) ⇒ Connection

    Parameters:

    • unix_socket (String)

      Path to the UNIX socket to create.

    • handler (Connection) (defaults to: Connection)

      Connection handler, should be a subclass of Connection.

    • handler_options (Hash)

      Options to pass to the ‘#initialize` method of the `handler`.

    Raises:

    • (Connection::Error::Permission)

      If the ‘unix_socket` file could not be created due to a permission error.

Returns:

  • (Connection)

    Listening instance of ‘handler`.

Raises:



244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/raktr.rb', line 244

def listen( *args, &block )
    fail_if_not_running

    options = determine_connection_options( *args )

    server_handler = proc do
        c = options[:handler].new( *options[:handler_options] )
        c.raktr = self
        block.call c if block_given?
        c
    end

    server = server_handler.call

    begin
        socket = options[:unix_socket] ?
            listen_unix( options[:unix_socket] ) :
            listen_tcp( options[:host], options[:port] )

        server.configure options.merge( socket: socket, role: :server, server_handler: server_handler )
        attach server
    rescue => e
        server.close e
    end

    server
end

#next_tick(&block) ⇒ Object

Parameters:

  • block (Block)

    Schedules a task to be run at the next tick.

Raises:



434
435
436
437
438
# File 'lib/raktr.rb', line 434

def next_tick( &block )
    fail_if_not_running
    @tasks << Tasks::OneOff.new( &block )
    nil
end

#on_error(&block) ⇒ Object

Parameters:

  • block (Block)

    Passes exceptions raised in the Reactor #thread to a task.

Raises:



386
387
388
389
390
# File 'lib/raktr.rb', line 386

def on_error( &block )
    fail_if_not_running
    @error_handlers << Tasks::Persistent.new( &block )
    nil
end

#on_shutdown(&block) ⇒ Object

Parameters:

Raises:



424
425
426
427
428
# File 'lib/raktr.rb', line 424

def on_shutdown( &block )
    fail_if_not_running
    @shutdown_tasks << Tasks::OneOff.new( &block )
    nil
end

#on_tick(&block) ⇒ Object

Parameters:

  • block (Block)

    Schedules a task to be run at each tick.

Raises:



396
397
398
399
400
# File 'lib/raktr.rb', line 396

def on_tick( &block )
    fail_if_not_running
    @tasks << Tasks::Persistent.new( &block )
    nil
end

#run(&block) ⇒ Object

Starts the Reactor loop and blocks the current #thread until #stop is called.

Parameters:

  • block (Block)

    Block to call right before initializing the loop.

Raises:



292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
# File 'lib/raktr.rb', line 292

def run( &block )
    fail_if_running

    @done_signal.clear

    @thread = Thread.current

    block.call( self ) if block_given?

    loop do
        begin
            @tasks.call
        rescue => e
            @error_handlers.call( e )
        end
        break if @stop

        begin
            process_connections
        rescue => e
            @error_handlers.call( e )
        end
        break if @stop

        @ticks += 1
    end

    @tasks.clear
    close_connections

    @shutdown_tasks.call

    @ticks  = 0
    @thread = nil

    @done_signal << nil
end

#run_block(&block) ⇒ Object

Starts the Reactor loop, blocks the current #thread while the given ‘block` executes and then #stops it.

Parameters:

  • block (Block)

    Block to call.

Raises:



371
372
373
374
375
376
377
378
379
# File 'lib/raktr.rb', line 371

def run_block( &block )
    fail ArgumentError, 'Missing block.' if !block_given?
    fail_if_running

    run do
        block.call
        next_tick { stop }
    end
end

#run_in_thread(&block) ⇒ Thread

Runs the Reactor in a thread and blocks until it is #running?.

Parameters:

  • block (Block)

    Block to call right before initializing the loop.

Returns:

  • (Thread)

    Reactor#thread

Raises:



338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# File 'lib/raktr.rb', line 338

def run_in_thread( &block )
    fail_if_running

    Thread.new do
        begin
            run(&block)
        rescue => e
            @error_handlers.call( e )
        end
    end

    sleep 0.1 while !running?

    thread
end

#running?Bool

Returns ‘true` if the Reactor is running, `false` otherwise.

Returns:

  • (Bool)

    ‘true` if the Reactor is running, `false` otherwise.



274
275
276
# File 'lib/raktr.rb', line 274

def running?
    thread && thread.alive?
end

#schedule(&block) ⇒ Object

Parameters:

  • block (Block)

    Schedules a task to be run as soon as possible, either immediately if the caller is in the same thread, or at the #next_tick otherwise.

Raises:



408
409
410
411
412
413
414
415
416
417
418
# File 'lib/raktr.rb', line 408

def schedule( &block )
    fail_if_not_running

    if in_same_thread?
        block.call self
    else
        next_tick(&block)
    end

    nil
end

#stopObject

Stops the Reactor loop as soon as possible.

Raises:



281
282
283
# File 'lib/raktr.rb', line 281

def stop
    schedule { @stop = true }
end

#threadThread?

Returns Thread of the loop, ‘nil` if not running.

Returns:

  • (Thread, nil)

    Thread of the loop, ‘nil` if not running.



470
471
472
# File 'lib/raktr.rb', line 470

def thread
    @thread
end

#waitObject

Waits for the Reactor to stop #running?.

Raises:



357
358
359
360
361
362
# File 'lib/raktr.rb', line 357

def wait
    fail_if_not_running

    @done_signal.pop
    true
end