Class: Raktr

Inherits:
Object
  • Object
show all
Defined in:
lib/raktr.rb,
lib/raktr/queue.rb,
lib/raktr/tasks.rb,
lib/raktr/global.rb,
lib/raktr/version.rb,
lib/raktr/iterator.rb,
lib/raktr/connection.rb,
lib/raktr/tasks/base.rb,
lib/raktr/tasks/delayed.rb,
lib/raktr/tasks/one_off.rb,
lib/raktr/connection/tls.rb,
lib/raktr/tasks/periodic.rb,
lib/raktr/tasks/persistent.rb,
lib/raktr/connection/callbacks.rb,
lib/raktr/connection/peer_info.rb

Overview

This file is part of the Raktr project and may be subject to

redistribution and commercial restrictions. Please see the Raktr
web site for more information on licensing and terms of use.

Direct Known Subclasses

Global

Defined Under Namespace

Classes: Connection, Error, Global, Iterator, Queue, Tasks

Constant Summary collapse

DEFAULT_OPTIONS =
{
    select_timeout:    0.02,
    max_tick_interval: 0.02
}
VERSION =
'0.0.3'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Raktr

Returns a new instance of Raktr.

Options Hash (options):

  • :max_tick_interval (Integer, nil) — default: 0.02

    How long to wait for each tick when no connections are available for processing.

  • :select_timeout (Integer) — default: 0.02

    How long to wait for connection activity before continuing to the next tick.



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/raktr.rb', line 128

def initialize( options = {} )
    options = DEFAULT_OPTIONS.merge( options )

    @max_tick_interval = options[:max_tick_interval]
    @select_timeout    = options[:select_timeout]

    # Socket => Connection
    @connections = {}
    @stop        = false
    @ticks       = 0
    @thread      = nil
    @tasks       = Tasks.new

    @error_handlers = Tasks.new
    @shutdown_tasks = Tasks.new
    @done_signal    = ::Queue.new
end

Instance Attribute Details

#connectionsArray<Connection> (readonly)



74
75
76
# File 'lib/raktr.rb', line 74

def connections
  @connections
end

#max_tick_intervalInteger?



70
71
72
# File 'lib/raktr.rb', line 70

def max_tick_interval
  @max_tick_interval
end

#ticksInteger (readonly)



78
79
80
# File 'lib/raktr.rb', line 78

def ticks
  @ticks
end

Class Method Details

.globalReactor



89
90
91
# File 'lib/raktr.rb', line 89

def global
    @raktr ||= Global.instance
end

.jruby?Boolean



116
117
118
# File 'lib/raktr.rb', line 116

def jruby?
    RUBY_PLATFORM == 'java'
end

.stopObject

Stops the global Reactor instance and destroys it. The next call to global will return a new instance.



95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/raktr.rb', line 95

def stop
    return if !@raktr

    global.stop rescue Error::NotRunning

    # Admittedly not the cleanest solution, but that's the only way to
    # force a Singleton to re-initialize -- and we want the Singleton to
    # cleanly implement the pattern in a Thread-safe way.
    global.class.instance_variable_set(:@singleton__instance__, nil)

    @raktr = nil
end

.supports_unix_sockets?Boolean



108
109
110
111
112
113
114
# File 'lib/raktr.rb', line 108

def supports_unix_sockets?
    return false if jruby?

    !!UNIXSocket
rescue NameError
    false
end

Instance Method Details

#at_interval(interval, &block) ⇒ Object

Note:

Time accuracy cannot be guaranteed.

Raises:



450
451
452
453
454
# File 'lib/raktr.rb', line 450

def at_interval( interval, &block )
    fail_if_not_running
    @tasks << Tasks::Periodic.new( interval, &block )
    nil
end

#attach(connection) ⇒ Object

Attaches a connection to the Reactor loop.

Raises:



493
494
495
496
497
498
499
500
501
# File 'lib/raktr.rb', line 493

def attach( connection )
    return if attached? connection

    schedule do
        connection.raktr = self
        @connections[connection.to_io] = connection
        connection.on_attach
    end
end

#attached?(connection) ⇒ Bool



522
523
524
# File 'lib/raktr.rb', line 522

def attached?( connection )
    @connections.include? connection.to_io
end

#connect(host, port, handler = Connection, *handler_options) ⇒ Connection #connect(unix_socket, handler = Connection, *handler_options) ⇒ Connection

Note:

Connection errors will be passed to the ‘handler`’s Raktr::Connection::Callbacks#on_close method as a ‘reason` argument.

Connects to a peer.

Raises:



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/raktr.rb', line 188

def connect( *args, &block )
    fail_if_not_running

    options = determine_connection_options( *args )

    connection = options[:handler].new( *options[:handler_options] )
    connection.raktr = self
    block.call connection if block_given?

    begin
        socket = options[:unix_socket] ?
            connect_unix( options[:unix_socket] ) : connect_tcp

        connection.configure options.merge( socket: socket, role: :client )
        attach connection
    rescue => e
        connection.close e
        raise
    end

    connection
end

#create_iterator(list, concurrency = 1) ⇒ Reactor::Iterator



152
153
154
# File 'lib/raktr.rb', line 152

def create_iterator( list, concurrency = 1 )
    Raktr::Iterator.new( self, list, concurrency )
end

#create_queueReactor::Queue



158
159
160
# File 'lib/raktr.rb', line 158

def create_queue
    Raktr::Queue.new self
end

#delay(time, &block) ⇒ Object

Note:

Time accuracy cannot be guaranteed.

Raises:



464
465
466
467
468
# File 'lib/raktr.rb', line 464

def delay( time, &block )
    fail_if_not_running
    @tasks << Tasks::Delayed.new( time, &block )
    nil
end

#detach(connection) ⇒ Object

Detaches a connection from the Reactor loop.

Raises:



510
511
512
513
514
515
516
517
518
# File 'lib/raktr.rb', line 510

def detach( connection )
    return if !attached?( connection )

    schedule do
        connection.on_detach
        @connections.delete connection.to_io
        connection.raktr = nil
    end
end

#in_same_thread?Bool

Returns ‘true` if the caller is in the same #thread as the reactor loop, `false` otherwise.

Raises:



481
482
483
484
# File 'lib/raktr.rb', line 481

def in_same_thread?
    fail_if_not_running
    Thread.current == thread
end

#listen(host, port, handler = Connection, *handler_options) ⇒ Connection #listen(unix_socket, handler = Connection, *handler_options) ⇒ Connection

Note:

Connection errors will be passed to the ‘handler`’s Raktr::Connection::Callbacks#on_close method as a ‘reason` argument.

Listens for incoming connections.

Overloads:

  • #listen(host, port, handler = Connection, *handler_options) ⇒ Connection

    Raises:

    • (Connection::Error::HostNotFound)

      If the ‘host` is invalid.

    • (Connection::Error::Permission)

      If the ‘port` could not be opened due to a permission error.

  • #listen(unix_socket, handler = Connection, *handler_options) ⇒ Connection

    Raises:

    • (Connection::Error::Permission)

      If the ‘unix_socket` file could not be created due to a permission error.

Raises:



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/raktr.rb', line 245

def listen( *args, &block )
    fail_if_not_running

    options = determine_connection_options( *args )

    server_handler = proc do
        c = options[:handler].new( *options[:handler_options] )
        c.raktr = self
        block.call c if block_given?
        c
    end

    server = server_handler.call

    begin
        socket = options[:unix_socket] ?
            listen_unix( options[:unix_socket] ) :
            listen_tcp( options[:host], options[:port] )

        server.configure options.merge( socket: socket, role: :server, server_handler: server_handler )
        attach server
    rescue => e
        server.close e
        raise
    end

    server
end

#next_tick(&block) ⇒ Object

Raises:



436
437
438
439
440
# File 'lib/raktr.rb', line 436

def next_tick( &block )
    fail_if_not_running
    @tasks << Tasks::OneOff.new( &block )
    nil
end

#on_error(&block) ⇒ Object

Raises:



388
389
390
391
392
# File 'lib/raktr.rb', line 388

def on_error( &block )
    fail_if_not_running
    @error_handlers << Tasks::Persistent.new( &block )
    nil
end

#on_shutdown(&block) ⇒ Object

Raises:



426
427
428
429
430
# File 'lib/raktr.rb', line 426

def on_shutdown( &block )
    fail_if_not_running
    @shutdown_tasks << Tasks::OneOff.new( &block )
    nil
end

#on_tick(&block) ⇒ Object

Raises:



398
399
400
401
402
# File 'lib/raktr.rb', line 398

def on_tick( &block )
    fail_if_not_running
    @tasks << Tasks::Persistent.new( &block )
    nil
end

#run(&block) ⇒ Object

Starts the Reactor loop and blocks the current #thread until #stop is called.

Raises:



294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# File 'lib/raktr.rb', line 294

def run( &block )
    fail_if_running

    @done_signal.clear

    @thread = Thread.current

    block.call( self ) if block_given?

    loop do
        begin
            @tasks.call
        rescue => e
            @error_handlers.call( e )
        end
        break if @stop

        begin
            process_connections
        rescue => e
            @error_handlers.call( e )
        end
        break if @stop

        @ticks += 1
    end

    @tasks.clear
    close_connections

    @shutdown_tasks.call

    @ticks  = 0
    @thread = nil

    @done_signal << nil
end

#run_block(&block) ⇒ Object

Starts the Reactor loop, blocks the current #thread while the given ‘block` executes and then #stops it.

Raises:



373
374
375
376
377
378
379
380
381
# File 'lib/raktr.rb', line 373

def run_block( &block )
    fail ArgumentError, 'Missing block.' if !block_given?
    fail_if_running

    run do
        block.call
        next_tick { stop }
    end
end

#run_in_thread(&block) ⇒ Thread

Runs the Reactor in a thread and blocks until it is #running?.

Raises:



340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# File 'lib/raktr.rb', line 340

def run_in_thread( &block )
    fail_if_running

    Thread.new do
        begin
            run(&block)
        rescue => e
            @error_handlers.call( e )
        end
    end

    sleep 0.1 while !running?

    thread
end

#running?Bool



276
277
278
# File 'lib/raktr.rb', line 276

def running?
    thread && thread.alive?
end

#schedule(&block) ⇒ Object

Raises:



410
411
412
413
414
415
416
417
418
419
420
# File 'lib/raktr.rb', line 410

def schedule( &block )
    fail_if_not_running

    if in_same_thread?
        block.call self
    else
        next_tick(&block)
    end

    nil
end

#stopObject

Stops the Reactor loop as soon as possible.

Raises:



283
284
285
# File 'lib/raktr.rb', line 283

def stop
    schedule { @stop = true }
end

#threadThread?



472
473
474
# File 'lib/raktr.rb', line 472

def thread
    @thread
end

#waitObject

Waits for the Reactor to stop #running?.

Raises:



359
360
361
362
363
364
# File 'lib/raktr.rb', line 359

def wait
    fail_if_not_running

    @done_signal.pop
    true
end