Class: Raktr

Inherits:
Object
  • Object
show all
Defined in:
lib/raktr.rb,
lib/raktr/queue.rb,
lib/raktr/tasks.rb,
lib/raktr/global.rb,
lib/raktr/version.rb,
lib/raktr/iterator.rb,
lib/raktr/connection.rb,
lib/raktr/tasks/base.rb,
lib/raktr/tasks/delayed.rb,
lib/raktr/tasks/one_off.rb,
lib/raktr/connection/tls.rb,
lib/raktr/tasks/periodic.rb,
lib/raktr/tasks/persistent.rb,
lib/raktr/connection/callbacks.rb,
lib/raktr/connection/peer_info.rb

Overview

This file is part of the Raktr project and may be subject to

redistribution and commercial restrictions. Please see the Raktr
web site for more information on licensing and terms of use.

Direct Known Subclasses

Global

Defined Under Namespace

Classes: Connection, Error, Global, Iterator, Queue, Tasks

Constant Summary collapse

DEFAULT_OPTIONS =
{
    select_timeout:    0.02,
    max_tick_interval: 0.02
}
VERSION =
'0.1.0'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Raktr

Returns a new instance of Raktr.

Parameters:

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :max_tick_interval (Integer, nil) — default: 0.02

    How long to wait for each tick when no connections are available for processing.

  • :select_timeout (Integer) — default: 0.02

    How long to wait for connection activity before continuing to the next tick.



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/raktr.rb', line 128

def initialize( options = {} )
    options = DEFAULT_OPTIONS.merge( options )

    @max_tick_interval = options[:max_tick_interval]
    @select_timeout    = options[:select_timeout]

    # Socket => Connection
    @connections = {}
    @stop        = false
    @ticks       = 0
    @thread      = nil
    @tasks       = Tasks.new

    @error_handlers = Tasks.new
    @shutdown_tasks = Tasks.new
    @done_signal    = ::Queue.new
end

Instance Attribute Details

#connectionsArray<Connection> (readonly)

Returns Attached connections.

Returns:



74
75
76
# File 'lib/raktr.rb', line 74

def connections
  @connections
end

#max_tick_intervalInteger?

Returns Amount of time to wait for a connection.

Returns:

  • (Integer, nil)

    Amount of time to wait for a connection.



70
71
72
# File 'lib/raktr.rb', line 70

def max_tick_interval
  @max_tick_interval
end

#ticksInteger (readonly)

Returns Amount of ticks.

Returns:

  • (Integer)

    Amount of ticks.



78
79
80
# File 'lib/raktr.rb', line 78

def ticks
  @ticks
end

Class Method Details

.globalReactor

Returns Lazy-loaded, globally accessible Reactor.

Returns:

  • (Reactor)

    Lazy-loaded, globally accessible Reactor.



89
90
91
# File 'lib/raktr.rb', line 89

def global
    @raktr ||= Global.instance
end

.jruby?Boolean

Returns:

  • (Boolean)


116
117
118
# File 'lib/raktr.rb', line 116

def jruby?
    RUBY_PLATFORM == 'java'
end

.stopObject

Stops the global Reactor instance and destroys it. The next call to global will return a new instance.



95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/raktr.rb', line 95

def stop
    return if !@raktr

    global.stop rescue Error::NotRunning

    # Admittedly not the cleanest solution, but that's the only way to
    # force a Singleton to re-initialize -- and we want the Singleton to
    # cleanly implement the pattern in a Thread-safe way.
    global.class.instance_variable_set(:@singleton__instance__, nil)

    @raktr = nil
end

.supports_unix_sockets?Boolean

Returns:

  • (Boolean)


108
109
110
111
112
113
114
# File 'lib/raktr.rb', line 108

def supports_unix_sockets?
    return false if jruby?

    !!UNIXSocket
rescue NameError
    false
end

Instance Method Details

#at_interval(interval, &block) ⇒ Object

Note:

Time accuracy cannot be guaranteed.

Parameters:

  • interval (Float)

    Time in seconds.

  • block (Block)

    Schedules a task to be run at every ‘interval` seconds.

Raises:



464
465
466
467
468
# File 'lib/raktr.rb', line 464

def at_interval( interval, &block )
    fail_if_not_running
    @tasks << Tasks::Periodic.new( interval, &block )
    nil
end

#attach(connection) ⇒ Object

Attaches a connection to the Reactor loop.

Parameters:

Raises:



507
508
509
510
511
512
513
514
515
# File 'lib/raktr.rb', line 507

def attach( connection )
    return if attached? connection

    schedule do
        connection.raktr = self
        @connections[connection.to_io] = connection
        connection.on_attach
    end
end

#attached?(connection) ⇒ Bool

Returns ‘true` if the connection is attached, `false` otherwise.

Returns:

  • (Bool)

    ‘true` if the connection is attached, `false` otherwise.



536
537
538
# File 'lib/raktr.rb', line 536

def attached?( connection )
    @connections.include? connection.to_io
end

#connect(host, port, handler = Connection, *handler_options) ⇒ Connection #connect(unix_socket, handler = Connection, *handler_options) ⇒ Connection

Note:

Connection errors will be passed to the ‘handler`’s Raktr::Connection::Callbacks#on_close method as a ‘reason` argument.

Connects to a peer.

Overloads:

  • #connect(host, port, handler = Connection, *handler_options) ⇒ Connection

    Parameters:

    • host (String)
    • port (Integer)
    • handler (Connection) (defaults to: Connection)

      Connection handler, should be a subclass of Connection.

    • handler_options (Hash)

      Options to pass to the ‘#initialize` method of the `handler`.

  • #connect(unix_socket, handler = Connection, *handler_options) ⇒ Connection

    Parameters:

    • unix_socket (String)

      Path to the UNIX socket to connect.

    • handler (Connection) (defaults to: Connection)

      Connection handler, should be a subclass of Connection.

    • handler_options (Hash)

      Options to pass to the ‘#initialize` method of the `handler`.

Returns:

  • (Connection)

    Connected instance of ‘handler`.

Raises:



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/raktr.rb', line 188

def connect( *args, &block )
    fail_if_not_running

    tls = nil
    if args.last.is_a?( Hash ) && args.last.include?( :tls )
        tls = args.last[:tls]
    else
        tls = determine_tls_options( :server )
    end

    options = determine_connection_options( *args )

    connection = options[:handler].new( *options[:handler_options] )
    connection.raktr = self
    block.call connection if block_given?

    begin
        socket = options[:unix_socket] ?
            connect_unix( options[:unix_socket] ) : connect_tcp

        connection.configure options.merge( socket: socket, tls: tls, role: :client )
        attach connection
    rescue => e
        connection.close e
        raise
    end

    connection
end

#create_iterator(list, concurrency = 1000) ⇒ Reactor::Iterator

Returns New Reactor::Iterator with ‘self` as the scheduler.

Parameters:

  • list (#to_a)

    List to iterate.

  • concurrency (Integer) (defaults to: 1000)

    Parallel workers to spawn.

Returns:

  • (Reactor::Iterator)

    New Reactor::Iterator with ‘self` as the scheduler.



152
153
154
# File 'lib/raktr.rb', line 152

def create_iterator( list, concurrency = 1000 )
    Raktr::Iterator.new( self, list, concurrency )
end

#create_queueReactor::Queue

Returns New Reactor::Queue with ‘self` as the scheduler.

Returns:

  • (Reactor::Queue)

    New Reactor::Queue with ‘self` as the scheduler.



158
159
160
# File 'lib/raktr.rb', line 158

def create_queue
    Raktr::Queue.new self
end

#delay(time, &block) ⇒ Object

Note:

Time accuracy cannot be guaranteed.

Parameters:

  • time (Float)

    Time in seconds.

  • block (Block)

    Schedules a task to be run in ‘time` seconds.

Raises:



478
479
480
481
482
# File 'lib/raktr.rb', line 478

def delay( time, &block )
    fail_if_not_running
    @tasks << Tasks::Delayed.new( time, &block )
    nil
end

#detach(connection) ⇒ Object

Detaches a connection from the Reactor loop.

Parameters:

Raises:



524
525
526
527
528
529
530
531
532
# File 'lib/raktr.rb', line 524

def detach( connection )
    return if !attached?( connection )

    schedule do
        connection.on_detach
        @connections.delete connection.to_io
        connection.raktr = nil
    end
end

#in_same_thread?Bool

Returns ‘true` if the caller is in the same #thread as the reactor loop, `false` otherwise.

Returns:

  • (Bool)

    ‘true` if the caller is in the same #thread as the reactor loop, `false` otherwise.

Raises:



495
496
497
498
# File 'lib/raktr.rb', line 495

def in_same_thread?
    fail_if_not_running
    Thread.current == thread
end

#listen(host, port, handler = Connection, *handler_options) ⇒ Connection #listen(unix_socket, handler = Connection, *handler_options) ⇒ Connection

Note:

Connection errors will be passed to the ‘handler`’s Raktr::Connection::Callbacks#on_close method as a ‘reason` argument.

Listens for incoming connections.

Overloads:

  • #listen(host, port, handler = Connection, *handler_options) ⇒ Connection

    Parameters:

    • host (String)
    • port (Integer)
    • handler (Connection) (defaults to: Connection)

      Connection handler, should be a subclass of Connection.

    • handler_options (Hash)

      Options to pass to the ‘#initialize` method of the `handler`.

    Raises:

    • (Connection::Error::HostNotFound)

      If the ‘host` is invalid.

    • (Connection::Error::Permission)

      If the ‘port` could not be opened due to a permission error.

  • #listen(unix_socket, handler = Connection, *handler_options) ⇒ Connection

    Parameters:

    • unix_socket (String)

      Path to the UNIX socket to create.

    • handler (Connection) (defaults to: Connection)

      Connection handler, should be a subclass of Connection.

    • handler_options (Hash)

      Options to pass to the ‘#initialize` method of the `handler`.

    Raises:

    • (Connection::Error::Permission)

      If the ‘unix_socket` file could not be created due to a permission error.

Returns:

  • (Connection)

    Listening instance of ‘handler`.

Raises:



252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/raktr.rb', line 252

def listen( *args, &block )
    fail_if_not_running

    tls = nil
    if args.last.is_a?( Hash ) && args.last.include?( :tls )
        tls = args.last[:tls]
    else
        tls = determine_tls_options( :server )
    end

    options = determine_connection_options( *args )

    server_handler = proc do
        c = options[:handler].new( *options[:handler_options] )
        c.raktr = self
        block.call c if block_given?
        c
    end

    server = server_handler.call

    begin
        socket = options[:unix_socket] ?
            listen_unix( options[:unix_socket] ) :
            listen_tcp( options[:host], options[:port] )

        server.configure options.merge( socket: socket, tls: tls, role: :server, server_handler: server_handler )
        attach server
    rescue => e
        server.close e
        raise
    end

    server
end

#next_tick(&block) ⇒ Object

Parameters:

  • block (Block)

    Schedules a task to be run at the next tick.

Raises:



450
451
452
453
454
# File 'lib/raktr.rb', line 450

def next_tick( &block )
    fail_if_not_running
    @tasks << Tasks::OneOff.new( &block )
    nil
end

#on_error(&block) ⇒ Object

Parameters:

  • block (Block)

    Passes exceptions raised in the Reactor #thread to a task.

Raises:



402
403
404
405
406
# File 'lib/raktr.rb', line 402

def on_error( &block )
    fail_if_not_running
    @error_handlers << Tasks::Persistent.new( &block )
    nil
end

#on_shutdown(&block) ⇒ Object

Parameters:

Raises:



440
441
442
443
444
# File 'lib/raktr.rb', line 440

def on_shutdown( &block )
    fail_if_not_running
    @shutdown_tasks << Tasks::OneOff.new( &block )
    nil
end

#on_tick(&block) ⇒ Object

Parameters:

  • block (Block)

    Schedules a task to be run at each tick.

Raises:



412
413
414
415
416
# File 'lib/raktr.rb', line 412

def on_tick( &block )
    fail_if_not_running
    @tasks << Tasks::Persistent.new( &block )
    nil
end

#run(&block) ⇒ Object

Starts the Reactor loop and blocks the current #thread until #stop is called.

Parameters:

  • block (Block)

    Block to call right before initializing the loop.

Raises:



308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
# File 'lib/raktr.rb', line 308

def run( &block )
    fail_if_running

    @done_signal.clear

    @thread = Thread.current

    block.call( self ) if block_given?

    loop do
        begin
            @tasks.call
        rescue => e
            @error_handlers.call( e )
        end
        break if @stop

        begin
            process_connections
        rescue => e
            @error_handlers.call( e )
        end
        break if @stop

        @ticks += 1
    end

    @tasks.clear
    close_connections

    @shutdown_tasks.call

    @ticks  = 0
    @thread = nil

    @done_signal << nil
end

#run_block(&block) ⇒ Object

Starts the Reactor loop, blocks the current #thread while the given ‘block` executes and then #stops it.

Parameters:

  • block (Block)

    Block to call.

Raises:



387
388
389
390
391
392
393
394
395
# File 'lib/raktr.rb', line 387

def run_block( &block )
    fail ArgumentError, 'Missing block.' if !block_given?
    fail_if_running

    run do
        block.call
        next_tick { stop }
    end
end

#run_in_thread(&block) ⇒ Thread

Runs the Reactor in a thread and blocks until it is #running?.

Parameters:

  • block (Block)

    Block to call right before initializing the loop.

Returns:

  • (Thread)

    Reactor#thread

Raises:



354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
# File 'lib/raktr.rb', line 354

def run_in_thread( &block )
    fail_if_running

    Thread.new do
        begin
            run(&block)
        rescue => e
            @error_handlers.call( e )
        end
    end

    sleep 0.1 while !running?

    thread
end

#running?Bool

Returns ‘true` if the Reactor is running, `false` otherwise.

Returns:

  • (Bool)

    ‘true` if the Reactor is running, `false` otherwise.



290
291
292
# File 'lib/raktr.rb', line 290

def running?
    thread && thread.alive?
end

#schedule(&block) ⇒ Object

Parameters:

  • block (Block)

    Schedules a task to be run as soon as possible, either immediately if the caller is in the same thread, or at the #next_tick otherwise.

Raises:



424
425
426
427
428
429
430
431
432
433
434
# File 'lib/raktr.rb', line 424

def schedule( &block )
    fail_if_not_running

    if in_same_thread?
        block.call self
    else
        next_tick(&block)
    end

    nil
end

#stopObject

Stops the Reactor loop as soon as possible.

Raises:



297
298
299
# File 'lib/raktr.rb', line 297

def stop
    schedule { @stop = true }
end

#threadThread?

Returns Thread of the loop, ‘nil` if not running.

Returns:

  • (Thread, nil)

    Thread of the loop, ‘nil` if not running.



486
487
488
# File 'lib/raktr.rb', line 486

def thread
    @thread
end

#waitObject

Waits for the Reactor to stop #running?.

Raises:



373
374
375
376
377
378
# File 'lib/raktr.rb', line 373

def wait
    fail_if_not_running

    @done_signal.pop
    true
end