Class: Libuv::Reactor

Inherits:
Object show all
Extended by:
Accessors, ClassMethods
Includes:
Assertions, Resource
Defined in:
lib/libuv/reactor.rb

Defined Under Namespace

Modules: ClassMethods

Constant Summary collapse

CRITICAL =
::Mutex.new

Constants included from Accessors

Accessors::Functions

Constants included from Assertions

Assertions::MSG_NO_PROC

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Accessors

reactor

Methods included from ClassMethods

create, current, default, new

Methods included from Resource

#check_result, #check_result!, #resolve, #to_ptr

Methods included from Assertions

#assert_block, #assert_boolean, #assert_type

Constructor Details

#initialize(pointer) ⇒ Reactor

Initialize a reactor using an FFI::Pointer to a libuv reactor



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/libuv/reactor.rb', line 56

def initialize(pointer) # :notnew:
    @pointer = pointer
    @reactor = self
    @run_count = 0
    @ref_count = 0

    # Create an async call for scheduling work from other threads
    @run_queue = Queue.new
    @process_queue = @reactor.async method(:process_queue_cb)
    @process_queue.unref

    # Create a next tick timer
    @next_tick = @reactor.timer method(:next_tick_cb)
    @next_tick.unref

    # Create an async call for ending the reactor
    @stop_reactor = @reactor.async method(:stop_cb)
    @stop_reactor.unref

    # Libuv can prevent the application shutting down once the main thread has ended
    # The addition of a prepare function prevents this from happening.
    @reactor_prep = Libuv::Prepare.new(@reactor, method(:noop))
    @reactor_prep.unref
    @reactor_prep.start

    # LibUV ingnores program interrupt by default.
    # We provide normal behaviour and allow this to be overriden
    @on_signal = proc { stop_cb }
    sig_callback = method(:signal_cb)
    self.signal(:INT, sig_callback).unref
    self.signal(:HUP, sig_callback).unref
    self.signal(:TERM, sig_callback).unref

    # Notify of errors
    @throw_on_exit = nil
    @reactor_notify_default = @reactor_notify = proc { |error|
        @throw_on_exit = error
    }
end

Instance Attribute Details

#run_countObject (readonly)

Returns the value of attribute run_count.



96
97
98
# File 'lib/libuv/reactor.rb', line 96

def run_count
  @run_count
end

Instance Method Details

#all(*promises) ⇒ ::Libuv::Q::Promise

Combines multiple promises into a single promise that is resolved when all of the input promises are resolved. (thread safe)

Parameters:

  • *promises (::Libuv::Q::Promise)

    a number of promises that will be combined into a single promise

Returns:

  • (::Libuv::Q::Promise)

    Returns a single promise that will be resolved with an array of values, each value corresponding to the promise at the same index in the ‘promises` array. If any of the promises is resolved with a rejection, this resulting promise will be resolved with the same rejection.



241
242
243
# File 'lib/libuv/reactor.rb', line 241

def all(*promises)
    Q.all(@reactor, *promises)
end

#any(*promises) ⇒ ::Libuv::Q::Promise

Combines multiple promises into a single promise that is resolved when any of the input promises are resolved.

Parameters:

  • *promises (::Libuv::Q::Promise)

    a number of promises that will be combined into a single promise

Returns:



251
252
253
# File 'lib/libuv/reactor.rb', line 251

def any(*promises)
    Q.any(@reactor, *promises)
end

#async(callback = nil, &block) ⇒ ::Libuv::Async

Get a new Async handle

Returns:



375
376
377
378
379
380
# File 'lib/libuv/reactor.rb', line 375

def async(callback = nil, &block)
    callback ||= block
    handle = Async.new(@reactor)
    handle.progress callback if callback
    handle
end

#check(callback = nil, &blk) ⇒ ::Libuv::Check

Get a new Check handle

Returns:



360
361
362
# File 'lib/libuv/reactor.rb', line 360

def check(callback = nil, &blk)
    Check.new(@reactor, callback || blk)
end

#defer::Libuv::Q::Deferred

Creates a deferred result object for where the result of an operation may only be returned at some point in the future or is being processed on a different thread (thread safe)



229
230
231
# File 'lib/libuv/reactor.rb', line 229

def defer
    Q.defer(@reactor)
end

#file(path, flags = 0, mode: 0, **opts, &blk) ⇒ ::Libuv::File

Opens a file and returns an object that can be used to manipulate it

Parameters:

  • path (String)

    the path to the file or folder for watching

  • flags (Integer) (defaults to: 0)

    see ruby File::Constants

  • mode (Integer) (defaults to: 0)

Returns:



442
443
444
445
446
447
# File 'lib/libuv/reactor.rb', line 442

def file(path, flags = 0, mode: 0, **opts, &blk)
    assert_type(String, path, "path must be a String")
    assert_type(Integer, flags, "flags must be an Integer")
    assert_type(Integer, mode, "mode must be an Integer")
    File.new(@reactor, path, flags, mode: mode, **opts, &blk)
end

#filesystem::Libuv::Filesystem

Returns an object for manipulating the filesystem

Returns:



452
453
454
# File 'lib/libuv/reactor.rb', line 452

def filesystem
    Filesystem.new(@reactor)
end

#finally(*promises) ⇒ ::Libuv::Q::Promise

Combines multiple promises into a single promise that is resolved when all of the input promises are resolved or rejected.

Parameters:

  • *promises (::Libuv::Q::Promise)

    a number of promises that will be combined into a single promise

Returns:

  • (::Libuv::Q::Promise)

    Returns a single promise that will be resolved with an array of values, each [result, wasResolved] value pair corresponding to a at the same index in the ‘promises` array.



262
263
264
# File 'lib/libuv/reactor.rb', line 262

def finally(*promises)
    Q.finally(@reactor, *promises)
end

#fs_event(path) ⇒ ::Libuv::FSEvent

Get a new FSEvent instance

Parameters:

  • path (String)

    the path to the file or folder for watching

Returns:

Raises:

  • (ArgumentError)

    if path is not a string



431
432
433
434
# File 'lib/libuv/reactor.rb', line 431

def fs_event(path)
    assert_type(String, path)
    FSEvent.new(@reactor, path)
end

#handleObject



151
# File 'lib/libuv/reactor.rb', line 151

def handle; @pointer; end

#idle(callback = nil, &block) ⇒ ::Libuv::Idle

Get a new Idle handle

Parameters:

  • callback (Proc) (defaults to: nil)

    the callback to be called on idle trigger

Returns:



368
369
370
# File 'lib/libuv/reactor.rb', line 368

def idle(callback = nil, &block)
    Idle.new(@reactor, callback || block)
end

#inspectObject

Overwrite as errors in jRuby can literally hang VM when inspecting as many many classes will reference this class



146
147
148
# File 'lib/libuv/reactor.rb', line 146

def inspect
    "#<#{self.class}:0x#{self.__id__.to_s(16)} NT=#{@run_queue.length}>"
end

#log(error, msg = nil, trace = nil) ⇒ Object

Notifies the reactor there was an event that should be logged

Parameters:

  • error (Exception)

    the error

  • msg (String|nil) (defaults to: nil)

    optional context on the error

  • trace (Array<String>) (defaults to: nil)

    optional additional trace of caller if async



502
503
504
# File 'lib/libuv/reactor.rb', line 502

def log(error, msg = nil, trace = nil)
    @reactor_notify.call(error, msg, trace)
end

#lookup(hostname, hint = :IPv4, port = 9, wait: true, &block) ⇒ ::Libuv::Dns

Lookup a hostname

Parameters:

  • hostname (String)

    the domain name to lookup

  • port (Integer, String) (defaults to: 9)

    the service being connected too

  • callback (Proc)

    the callback to be called on success

Returns:



416
417
418
419
420
421
422
423
424
# File 'lib/libuv/reactor.rb', line 416

def lookup(hostname, hint = :IPv4, port = 9, wait: true, &block)
    dns = Dns.new(@reactor, hostname, port, hint, wait: wait)    # Work is a promise object
    if block_given? || !wait
        dns.then block
        dns
    else
        dns.results
    end
end

#lookup_error(err) ⇒ ::Libuv::Error

Lookup an error code and return is as an error object

Parameters:

  • err (Integer)

    The error code to look up.

Returns:



292
293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/libuv/reactor.rb', line 292

def lookup_error(err)
    name = ::Libuv::Ext.err_name(err)

    if name
        msg  = ::Libuv::Ext.strerror(err)
        ::Libuv::Error.const_get(name.to_sym).new("#{msg}, #{name}:#{err}")
    else
        # We want a back-trace in this case
        raise "error lookup failed for code #{err}"
    end
rescue Exception => e
    @reactor.log e, 'performing error lookup'
    e
end

#next_tick(callback = nil, &block) ⇒ Object

Queue some work to be processed in the next iteration of the event reactor (thread safe)

Parameters:

  • callback (Proc) (defaults to: nil)

    the callback to be called on the reactor thread

Raises:

  • (ArgumentError)

    if block is not given



478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
# File 'lib/libuv/reactor.rb', line 478

def next_tick(callback = nil, &block)
    callback ||= block
    assert_block(callback)

    @run_queue << callback
    if reactor_thread?
        # Create a next tick timer
        if not @next_tick_scheduled
            @next_tick.start(0)
            @next_tick_scheduled = true
            @next_tick.ref
        end
    else
        @process_queue.call
    end

    self
end

#notifier(callback = nil, &blk) ⇒ ::Libuv::Q::Promise

Provides a promise notifier for receiving un-handled exceptions

Returns:



220
221
222
223
# File 'lib/libuv/reactor.rb', line 220

def notifier(callback = nil, &blk)
    @reactor_notify = callback || blk || @reactor_notify_default
    self
end

#nowFixnum

Get current time in milliseconds

Returns:

  • (Fixnum)


284
285
286
# File 'lib/libuv/reactor.rb', line 284

def now
    ::Libuv::Ext.now(@pointer)
end

#on_program_interrupt(callback = nil, &block) ⇒ Object

Allows user defined behaviour when sig int is received



394
395
396
397
# File 'lib/libuv/reactor.rb', line 394

def on_program_interrupt(callback = nil, &block)
    @on_signal = callback || block
    self
end

#pipe(ipc = false) ⇒ ::Libuv::Pipe

Get a new Pipe instance

Parameters:

  • ipc (true, false) (defaults to: false)

    indicate if a handle will be used for ipc, useful for sharing tcp socket between processes

Returns:



338
339
340
# File 'lib/libuv/reactor.rb', line 338

def pipe(ipc = false)
    Pipe.new(@reactor, ipc)
end

#prepare(callback = nil, &blk) ⇒ ::Libuv::Prepare

Get a new Prepare handle

Returns:



353
354
355
# File 'lib/libuv/reactor.rb', line 353

def prepare(callback = nil, &blk)
    Prepare.new(@reactor, callback || blk)
end

#reactor_running?Boolean Also known as: running?

Tells you whether the Libuv reactor reactor is currently running.

Returns:

  • (Boolean)


521
522
523
# File 'lib/libuv/reactor.rb', line 521

def reactor_running?
    @reactor_running
end

#reactor_thread?Boolean

True if the calling thread is the same thread as the reactor.

Returns:

  • (Boolean)


514
515
516
# File 'lib/libuv/reactor.rb', line 514

def reactor_thread?
    self == Thread.current.thread_variable_get(:reactor)
end

#refObject

Prevents the reactor loop from stopping



201
202
203
204
205
206
# File 'lib/libuv/reactor.rb', line 201

def ref
    if reactor_thread? && reactor_running?
        @process_queue.ref if @ref_count == 0
        @ref_count += 1
    end
end

#reject(reason) ⇒ Object

Creates a promise that is resolved as rejected with the specified reason. This api should be used to forward rejection in a chain of promises. If you are dealing with the last promise in a promise chain, you don’t need to worry about it.



269
270
271
# File 'lib/libuv/reactor.rb', line 269

def reject(reason)
    Q.reject(@reactor, reason)
end

#run(run_type = :UV_RUN_DEFAULT) {|promise| ... } ⇒ Object

Run the actual event reactor. This method will block until the reactor is stopped.

Parameters:

  • run_type (:UV_RUN_DEFAULT, :UV_RUN_ONCE, :UV_RUN_NOWAIT) (defaults to: :UV_RUN_DEFAULT)

Yield Parameters:

  • promise (::Libuv::Q::Promise)

    Yields a promise that can be used for logging unhandled exceptions on the reactor.



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/libuv/reactor.rb', line 158

def run(run_type = :UV_RUN_DEFAULT)
    if not @reactor_running
        begin
            @reactor_running = true
            raise 'only one reactor allowed per-thread' if Thread.current.thread_variable_get(:reactor)

            Thread.current.thread_variable_set(:reactor, @reactor)
            @throw_on_exit = nil

            if block_given?
                update_time
                ::Fiber.new {
                    begin
                        yield @reactor
                    rescue Exception => e
                        log(e, 'in reactor run block')
                    end
                }.resume
            end
            @run_count += 1
            ::Libuv::Ext.run(@pointer, run_type)  # This is blocking
        ensure
            Thread.current.thread_variable_set(:reactor, nil)
            @reactor_running = false
            @run_queue.clear
        end

        # Raise the last unhandled error to occur on the reactor thread
        raise @throw_on_exit if @throw_on_exit

    elsif block_given?
        if reactor_thread?
            update_time
            yield @reactor
        else
            raise 'reactor already running on another thread'
        end
    end

    @reactor
end

#schedule(callback = nil, &block) ⇒ Object

Schedule some work to be processed on the event reactor as soon as possible (thread safe)

Parameters:

  • callback (Proc) (defaults to: nil)

    the callback to be called on the reactor thread

Raises:

  • (ArgumentError)

    if block is not given



460
461
462
463
464
465
466
467
468
469
470
471
472
# File 'lib/libuv/reactor.rb', line 460

def schedule(callback = nil, &block)
    callback ||= block
    assert_block(callback)

    if reactor_thread?
        callback.call
    else
        @run_queue << callback
        @process_queue.call
    end

    self
end

#signal(signum = nil, callback = nil, &block) ⇒ ::Libuv::Signal

Get a new signal handler

Returns:



385
386
387
388
389
390
391
# File 'lib/libuv/reactor.rb', line 385

def signal(signum = nil, callback = nil, &block)
    callback ||= block
    handle = Signal.new(@reactor)
    handle.progress callback if callback
    handle.start(signum) if signum
    handle
end

#stopObject

Closes handles opened by the reactor class and completes the current reactor iteration (thread safe)



507
508
509
# File 'lib/libuv/reactor.rb', line 507

def stop
    @stop_reactor.call
end

#tcp(callback = nil, &blk) ⇒ ::Libuv::TCP

Get a new TCP instance

Returns:



310
311
312
313
# File 'lib/libuv/reactor.rb', line 310

def tcp(callback = nil, &blk)
    callback ||= blk
    TCP.new(@reactor, progress: callback)
end

#timer(callback = nil, &blk) ⇒ ::Libuv::Timer

Get a new timer instance

Parameters:

  • callback (Proc) (defaults to: nil)

    the callback to be called on timer trigger

Returns:



346
347
348
# File 'lib/libuv/reactor.rb', line 346

def timer(callback = nil, &blk)
    Timer.new(@reactor, callback || blk)
end

#tty(fileno, readable = false) ⇒ ::Libuv::TTY

Get a new TTY instance

Parameters:

  • fileno (Integer)

    Integer file descriptor of a tty device

  • readable (true, false) (defaults to: false)

    Boolean indicating if TTY is readable

Returns:



328
329
330
331
332
# File 'lib/libuv/reactor.rb', line 328

def tty(fileno, readable = false)
    assert_type(Integer, fileno, "io#fileno must return an integer file descriptor, #{fileno.inspect} given")

    TTY.new(@reactor, fileno, readable)
end

#udp(callback = nil, &blk) ⇒ ::Libuv::UDP

Get a new UDP instance

Returns:



318
319
320
321
# File 'lib/libuv/reactor.rb', line 318

def udp(callback = nil, &blk)
    callback ||= blk
    UDP.new(@reactor, progress: callback)
end

#unrefObject

Allows the reactor loop to stop



209
210
211
212
213
214
# File 'lib/libuv/reactor.rb', line 209

def unref
    if reactor_thread? && reactor_running? && @ref_count > 0
        @ref_count -= 1
        @process_queue.unref if @ref_count == 0
    end
end

#update_timeObject

forces reactor time update, useful for getting more granular times

Returns:

  • nil



276
277
278
279
# File 'lib/libuv/reactor.rb', line 276

def update_time
    ::Libuv::Ext.update_time(@pointer)
    self
end

#work(callback = nil, &block) ⇒ ::Libuv::Work

Queue some work for processing in the libuv thread pool

Parameters:

  • callback (Proc) (defaults to: nil)

    the callback to be called in the thread pool

Returns:

Raises:

  • (ArgumentError)

    if block is not given



404
405
406
407
408
# File 'lib/libuv/reactor.rb', line 404

def work(callback = nil, &block)
    callback ||= block
    assert_block(callback)
    Work.new(@reactor, callback)    # Work is a promise object
end