Class: Libuv::Loop

Inherits:
Object show all
Extended by:
ClassMethods
Includes:
Assertions, Resource
Defined in:
lib/libuv/loop.rb,
lib/libuv/coroutines.rb

Defined Under Namespace

Modules: ClassMethods

Constant Summary collapse

LOOPS =
ThreadSafe::Cache.new
CRITICAL =
Mutex.new
@@use_fibers =
true

Constants included from Assertions

Assertions::MSG_NO_PROC

Instance Method Summary collapse

Methods included from ClassMethods

create, current, default, new

Methods included from Resource

#check_result, #check_result!, #resolve, #to_ptr

Methods included from Assertions

#assert_block, #assert_boolean, #assert_type

Constructor Details

#initialize(pointer) ⇒ Loop

Initialize a loop using an FFI::Pointer to a libuv loop



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/libuv/loop.rb', line 49

def initialize(pointer) # :notnew:
    @pointer = pointer
    @loop = self

    # Create an async call for scheduling work from other threads
    @run_queue = Queue.new
    @process_queue = @loop.async method(:process_queue_cb)
    @process_queue.unref

    # Create a next tick timer
    @next_tick = @loop.timer method(:next_tick_cb)
    @next_tick.unref

    # Create an async call for ending the loop
    @stop_loop = @loop.async method(:stop_cb)
    @stop_loop.unref
end

Instance Method Details

#all(*promises) ⇒ ::Libuv::Q::Promise

Combines multiple promises into a single promise that is resolved when all of the input promises are resolved. (thread safe)

Parameters:

  • *promises (::Libuv::Q::Promise)

    a number of promises that will be combined into a single promise

Returns:

  • (::Libuv::Q::Promise)

    Returns a single promise that will be resolved with an array of values, each value corresponding to the promise at the same index in the ‘promises` array. If any of the promises is resolved with a rejection, this resulting promise will be resolved with the same rejection.



172
173
174
# File 'lib/libuv/loop.rb', line 172

def all(*promises)
    Q.all(@loop, *promises)
end

#any(*promises) ⇒ ::Libuv::Q::Promise

Combines multiple promises into a single promise that is resolved when any of the input promises are resolved.

Parameters:

  • *promises (::Libuv::Q::Promise)

    a number of promises that will be combined into a single promise

Returns:



182
183
184
# File 'lib/libuv/loop.rb', line 182

def any(*promises)
    Q.any(@loop, *promises)
end

#async(callback = nil, &block) ⇒ ::Libuv::Async

Get a new Async handle

Returns:



297
298
299
300
301
302
# File 'lib/libuv/loop.rb', line 297

def async(callback = nil, &block)
    callback ||= block
    handle = Async.new(@loop)
    handle.progress callback if callback
    handle
end

#check(callback = nil, &blk) ⇒ ::Libuv::Check

Get a new Check handle

Returns:



282
283
284
# File 'lib/libuv/loop.rb', line 282

def check(callback = nil, &blk)
    Check.new(@loop, callback || blk)
end

#defer::Libuv::Q::Deferred

Creates a deferred result object for where the result of an operation may only be returned at some point in the future or is being processed on a different thread (thread safe)



160
161
162
# File 'lib/libuv/loop.rb', line 160

def defer
    Q.defer(@loop)
end

#file(path, flags = 0, mode = 0) ⇒ ::Libuv::File

Opens a file and returns an object that can be used to manipulate it

Parameters:

  • path (String)

    the path to the file or folder for watching

  • flags (Integer) (defaults to: 0)

    see ruby File::Constants

  • mode (Integer) (defaults to: 0)

Returns:



354
355
356
357
358
359
# File 'lib/libuv/loop.rb', line 354

def file(path, flags = 0, mode = 0)
    assert_type(String, path, "path must be a String")
    assert_type(Integer, flags, "flags must be an Integer")
    assert_type(Integer, mode, "mode must be an Integer")
    File.new(@loop, path, flags, mode)
end

#filesystem::Libuv::Filesystem

Returns an object for manipulating the filesystem

Returns:



364
365
366
# File 'lib/libuv/loop.rb', line 364

def filesystem
    Filesystem.new(@loop)
end

#finally(*promises) ⇒ ::Libuv::Q::Promise

Combines multiple promises into a single promise that is resolved when all of the input promises are resolved or rejected.

Parameters:

  • *promises (::Libuv::Q::Promise)

    a number of promises that will be combined into a single promise

Returns:

  • (::Libuv::Q::Promise)

    Returns a single promise that will be resolved with an array of values, each [result, wasResolved] value pair corresponding to a at the same index in the ‘promises` array.



193
194
195
# File 'lib/libuv/loop.rb', line 193

def finally(*promises)
    Q.finally(@loop, *promises)
end

#fs_event(path) ⇒ ::Libuv::FSEvent

Get a new FSEvent instance

Parameters:

  • path (String)

    the path to the file or folder for watching

Returns:

Raises:

  • (ArgumentError)

    if path is not a string



343
344
345
346
# File 'lib/libuv/loop.rb', line 343

def fs_event(path)
    assert_type(String, path)
    FSEvent.new(@loop, path)
end

#handleObject



112
# File 'lib/libuv/loop.rb', line 112

def handle; @pointer; end

#idle(callback = nil, &block) ⇒ ::Libuv::Idle

Get a new Idle handle

Parameters:

  • callback (Proc) (defaults to: nil)

    the callback to be called on idle trigger

Returns:



290
291
292
# File 'lib/libuv/loop.rb', line 290

def idle(callback = nil, &block)
    Idle.new(@loop, callback || block)
end

#inspectObject

Overwrite as errors in jRuby can literally hang VM when inspecting as many many classes will reference this class



107
108
109
# File 'lib/libuv/loop.rb', line 107

def inspect
    "#<#{self.class}:0x#{self.__id__.to_s(16)} NT=#{@run_queue.length}>"
end

#log(level, id, *args) ⇒ Object

Notifies the loop there was an event that should be logged

Parameters:

  • level (Symbol)

    the error level (info, warn, error etc)

  • id (Object)

    some kind of identifying information

  • *args (*args)

    any additional information



410
411
412
# File 'lib/libuv/loop.rb', line 410

def log(level, id, *args)
    @loop_notify.notify(level, id, *args)
end

#lookup(hostname, hint = :IPv4, port = 9, &block) ⇒ ::Libuv::Dns

Lookup a hostname

Parameters:

  • hostname (String)

    the domain name to lookup

  • port (Integer, String) (defaults to: 9)

    the service being connected too

  • callback (Proc)

    the callback to be called on success

Returns:



332
333
334
335
336
# File 'lib/libuv/loop.rb', line 332

def lookup(hostname, hint = :IPv4, port = 9, &block)
    dns = Dns.new(@loop, hostname, port, hint)    # Work is a promise object
    dns.then block if block_given?
    dns
end

#lookup_error(err) ⇒ ::Libuv::Error

Lookup an error code and return is as an error object

Parameters:

  • err (Integer)

    The error code to look up.

Returns:



216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/libuv/loop.rb', line 216

def lookup_error(err)
    name = ::Libuv::Ext.err_name(err)

    if name
        msg  = ::Libuv::Ext.strerror(err)
        ::Libuv::Error.const_get(name.to_sym).new(msg)
    else
        # We want a back-trace in this case
        raise "error lookup failed for code #{err}"
    end
rescue Exception => e
    @loop.log :warn, :error_lookup_failed, e
    e
end

#next_tick(callback = nil, &block) ⇒ Object

Queue some work to be processed in the next iteration of the event loop (thread safe)

Parameters:

  • callback (Proc) (defaults to: nil)

    the callback to be called on the reactor thread

Raises:

  • (ArgumentError)

    if block is not given



388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
# File 'lib/libuv/loop.rb', line 388

def next_tick(callback = nil, &block)
    callback ||= block
    assert_block(callback)

    @run_queue << callback
    if reactor_thread?
        # Create a next tick timer
        if not @next_tick_scheduled
            @next_tick.start(0)
            @next_tick_scheduled = true
            @next_tick.ref
        end
    else
        @process_queue.call
    end
end

#notifier::Libuv::Q::Promise

Provides a promise notifier for receiving un-handled exceptions

Returns:



152
153
154
# File 'lib/libuv/loop.rb', line 152

def notifier
    @loop_notify.promise
end

#nowFixnum

Get current time in milliseconds

Returns:

  • (Fixnum)


208
209
210
# File 'lib/libuv/loop.rb', line 208

def now
    ::Libuv::Ext.now(@pointer)
end

#pipe(ipc = false) ⇒ ::Libuv::Pipe

Get a new Pipe instance

Parameters:

  • ipc (true, false) (defaults to: false)

    indicate if a handle will be used for ipc, useful for sharing tcp socket between processes

Returns:



260
261
262
# File 'lib/libuv/loop.rb', line 260

def pipe(ipc = false)
    Pipe.new(@loop, ipc)
end

#prepare(callback = nil, &blk) ⇒ ::Libuv::Prepare

Get a new Prepare handle

Returns:



275
276
277
# File 'lib/libuv/loop.rb', line 275

def prepare(callback = nil, &blk)
    Prepare.new(@loop, callback || blk)
end

#reactor_running?Boolean

Tells you whether the Libuv reactor loop is currently running.

Returns:

  • (Boolean)


436
437
438
# File 'lib/libuv/loop.rb', line 436

def reactor_running?
    !@reactor_thread.nil?
end

#reactor_threadThread

Exposed to allow joining on the thread, when run in a multithreaded environment. Performing other actions on the thread has undefined semantics (read: a dangerous endevor).

Returns:

  • (Thread)


429
430
431
# File 'lib/libuv/loop.rb', line 429

def reactor_thread
    @reactor_thread
end

#reactor_thread?Boolean

True if the calling thread is the same thread as the reactor.

Returns:

  • (Boolean)


422
423
424
# File 'lib/libuv/loop.rb', line 422

def reactor_thread?
    @reactor_thread == Thread.current
end

#run(run_type = :UV_RUN_DEFAULT) {|promise| ... } ⇒ Object

Run the actual event loop. This method will block until the loop is stopped.

Parameters:

  • run_type (:UV_RUN_DEFAULT, :UV_RUN_ONCE, :UV_RUN_NOWAIT) (defaults to: :UV_RUN_DEFAULT)

Yield Parameters:

  • promise (::Libuv::Q::Promise)

    Yields a promise that can be used for logging unhandled exceptions on the loop.



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/libuv/loop.rb', line 119

def run(run_type = :UV_RUN_DEFAULT)
    if @reactor_thread.nil?
        @loop_notify = @loop.defer

        begin
            @reactor_thread = Thread.current
            LOOPS[@reactor_thread] = @loop
            if block_given?
                if @@use_fibers
                    Fiber.new { yield @loop_notify.promise }.resume
                else
                    yield @loop_notify.promise
                end
            end
            ::Libuv::Ext.run(@pointer, run_type)  # This is blocking
        ensure
            @reactor_thread = nil
            @run_queue.clear
        end
    elsif block_given?
        if @@use_fibers
            schedule { Fiber.new { yield @loop_notify.promise }.resume }
        else
            schedule { yield @loop_notify.promise }
        end
    end
    @loop
end

#schedule(callback = nil, &block) ⇒ Object

Schedule some work to be processed on the event loop as soon as possible (thread safe)

Parameters:

  • callback (Proc) (defaults to: nil)

    the callback to be called on the reactor thread

Raises:

  • (ArgumentError)

    if block is not given



372
373
374
375
376
377
378
379
380
381
382
# File 'lib/libuv/loop.rb', line 372

def schedule(callback = nil, &block)
    callback ||= block
    assert_block(callback)

    if reactor_thread?
        callback.call
    else
        @run_queue << callback
        @process_queue.call
    end
end

#signal(signum = nil, callback = nil, &block) ⇒ ::Libuv::Signal

Get a new signal handler

Returns:



307
308
309
310
311
312
313
# File 'lib/libuv/loop.rb', line 307

def signal(signum = nil, callback = nil, &block)
    callback ||= block
    handle = Signal.new(@loop)
    handle.progress callback if callback
    handle.start(signum) if signum
    handle
end

#stopObject

Closes handles opened by the loop class and completes the current loop iteration (thread safe)



415
416
417
# File 'lib/libuv/loop.rb', line 415

def stop
    @stop_loop.call
end

#tcp::Libuv::TCP

Get a new TCP instance

Returns:



234
235
236
# File 'lib/libuv/loop.rb', line 234

def tcp
    TCP.new(@loop)
end

#timer(callback = nil, &blk) ⇒ ::Libuv::Timer

Get a new timer instance

Parameters:

  • callback (Proc) (defaults to: nil)

    the callback to be called on timer trigger

Returns:



268
269
270
# File 'lib/libuv/loop.rb', line 268

def timer(callback = nil, &blk)
    Timer.new(@loop, callback || blk)
end

#tty(fileno, readable = false) ⇒ ::Libuv::TTY

Get a new TTY instance

Parameters:

  • fileno (Integer)

    Integer file descriptor of a tty device

  • readable (true, false) (defaults to: false)

    Boolean indicating if TTY is readable

Returns:



250
251
252
253
254
# File 'lib/libuv/loop.rb', line 250

def tty(fileno, readable = false)
    assert_type(Integer, fileno, "io#fileno must return an integer file descriptor, #{fileno.inspect} given")

    TTY.new(@loop, fileno, readable)
end

#udp::Libuv::UDP

Get a new UDP instance

Returns:



241
242
243
# File 'lib/libuv/loop.rb', line 241

def udp
    UDP.new(@loop)
end

#update_timeObject

forces loop time update, useful for getting more granular times

Returns:

  • nil



201
202
203
# File 'lib/libuv/loop.rb', line 201

def update_time
    ::Libuv::Ext.update_time(@pointer)
end

#work(callback = nil, &block) ⇒ ::Libuv::Work

Queue some work for processing in the libuv thread pool

Parameters:

  • callback (Proc) (defaults to: nil)

    the callback to be called in the thread pool

Returns:

Raises:

  • (ArgumentError)

    if block is not given



320
321
322
323
324
# File 'lib/libuv/loop.rb', line 320

def work(callback = nil, &block)
    callback ||= block
    assert_block(callback)
    Work.new(@loop, callback)    # Work is a promise object
end