Class: Libuv::Loop

Inherits:
Object
  • Object
show all
Extended by:
ClassMethods
Includes:
Assertions, Resource
Defined in:
lib/libuv/loop.rb

Defined Under Namespace

Modules: ClassMethods

Constant Summary collapse

LOOPS =
ThreadSafe::Cache.new
CRITICAL =
Mutex.new

Constants included from Assertions

Assertions::MSG_NO_PROC

Instance Method Summary collapse

Methods included from ClassMethods

create, current, default, new

Methods included from Resource

#check_result, #check_result!, #resolve, #to_ptr

Methods included from Assertions

#assert_block, #assert_boolean, #assert_type

Constructor Details

#initialize(pointer) ⇒ Loop

Initialize a loop using an FFI::Pointer to a libuv loop



48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/libuv/loop.rb', line 48

def initialize(pointer) # :notnew:
    @pointer = pointer
    @loop = self

    # Create an async call for scheduling work from other threads
    @run_queue = Queue.new
    @process_queue = Async.new @loop, method(:process_queue_cb)

    # Create a next tick timer
    @next_tick = @loop.timer method(:next_tick_cb)

    # Create an async call for ending the loop
    @stop_loop = Async.new @loop, method(:stop_cb)
end

Instance Method Details

#all(*promises) ⇒ ::Libuv::Q::Promise

Combines multiple promises into a single promise that is resolved when all of the input promises are resolved. (thread safe)

Parameters:

  • *promises (::Libuv::Q::Promise)

    a number of promises that will be combined into a single promise

Returns:

  • (::Libuv::Q::Promise)

    Returns a single promise that will be resolved with an array of values, each value corresponding to the promise at the same index in the ‘promises` array. If any of the promises is resolved with a rejection, this resulting promise will be resolved with the same rejection.



157
158
159
# File 'lib/libuv/loop.rb', line 157

def all(*promises)
    Q.all(@loop, *promises)
end

#any(*promises) ⇒ ::Libuv::Q::Promise

Combines multiple promises into a single promise that is resolved when any of the input promises are resolved.

Parameters:

  • *promises (::Libuv::Q::Promise)

    a number of promises that will be combined into a single promise

Returns:



167
168
169
# File 'lib/libuv/loop.rb', line 167

def any(*promises)
    Q.any(@loop, *promises)
end

#async(callback = nil, &block) ⇒ ::Libuv::Async

Get a new Async handle

Returns:



282
283
284
285
286
287
# File 'lib/libuv/loop.rb', line 282

def async(callback = nil, &block)
    callback ||= block
    handle = Async.new(@loop)
    handle.progress callback if callback
    handle
end

#check(callback = nil, &blk) ⇒ ::Libuv::Check

Get a new Check handle

Returns:



267
268
269
# File 'lib/libuv/loop.rb', line 267

def check(callback = nil, &blk)
    Check.new(@loop, callback || blk)
end

#defer::Libuv::Q::Deferred

Creates a deferred result object for where the result of an operation may only be returned at some point in the future or is being processed on a different thread (thread safe)



145
146
147
# File 'lib/libuv/loop.rb', line 145

def defer
    Q.defer(@loop)
end

#file(path, flags = 0, mode = 0) ⇒ ::Libuv::File

Opens a file and returns an object that can be used to manipulate it

Parameters:

  • path (String)

    the path to the file or folder for watching

  • flags (Integer) (defaults to: 0)

    see ruby File::Constants

  • mode (Integer) (defaults to: 0)

Returns:



339
340
341
342
343
344
# File 'lib/libuv/loop.rb', line 339

def file(path, flags = 0, mode = 0)
    assert_type(String, path, "path must be a String")
    assert_type(Integer, flags, "flags must be an Integer")
    assert_type(Integer, mode, "mode must be an Integer")
    File.new(@loop, path, flags, mode)
end

#filesystem::Libuv::Filesystem

Returns an object for manipulating the filesystem

Returns:



349
350
351
# File 'lib/libuv/loop.rb', line 349

def filesystem
    Filesystem.new(@loop)
end

#finally(*promises) ⇒ ::Libuv::Q::Promise

Combines multiple promises into a single promise that is resolved when all of the input promises are resolved or rejected.

Parameters:

  • *promises (::Libuv::Q::Promise)

    a number of promises that will be combined into a single promise

Returns:

  • (::Libuv::Q::Promise)

    Returns a single promise that will be resolved with an array of values, each [result, wasResolved] value pair corresponding to a at the same index in the ‘promises` array.



178
179
180
# File 'lib/libuv/loop.rb', line 178

def finally(*promises)
    Q.finally(@loop, *promises)
end

#fs_event(path) ⇒ ::Libuv::FSEvent

Get a new FSEvent instance

Parameters:

  • path (String)

    the path to the file or folder for watching

Returns:

Raises:

  • (ArgumentError)

    if path is not a string



328
329
330
331
# File 'lib/libuv/loop.rb', line 328

def fs_event(path)
    assert_type(String, path)
    FSEvent.new(@loop, path)
end

#handleObject



107
# File 'lib/libuv/loop.rb', line 107

def handle; @pointer; end

#idle(callback = nil, &block) ⇒ ::Libuv::Idle

Get a new Idle handle

Parameters:

  • callback (Proc) (defaults to: nil)

    the callback to be called on idle trigger

Returns:



275
276
277
# File 'lib/libuv/loop.rb', line 275

def idle(callback = nil, &block)
    Idle.new(@loop, callback || block)
end

#inspectObject

Overwrite as errors in jRuby can literally hang VM when inspecting as many many classes will reference this class



102
103
104
# File 'lib/libuv/loop.rb', line 102

def inspect
    "#<#{self.class}:0x#{self.__id__.to_s(16)} NT=#{@run_queue.length}>"
end

#log(level, id, *args) ⇒ Object

Notifies the loop there was an event that should be logged

Parameters:

  • level (Symbol)

    the error level (info, warn, error etc)

  • id (Object)

    some kind of identifying information

  • *args (*args)

    any additional information



394
395
396
# File 'lib/libuv/loop.rb', line 394

def log(level, id, *args)
    @loop_notify.notify(level, id, *args)
end

#lookup(hostname, hint = :IPv4, port = 9, &block) ⇒ ::Libuv::Dns

Lookup a hostname

Parameters:

  • hostname (String)

    the domain name to lookup

  • port (Integer, String) (defaults to: 9)

    the service being connected too

  • callback (Proc)

    the callback to be called on success

Returns:



317
318
319
320
321
# File 'lib/libuv/loop.rb', line 317

def lookup(hostname, hint = :IPv4, port = 9, &block)
    dns = Dns.new(@loop, hostname, port, hint)    # Work is a promise object
    dns.then block if block_given?
    dns
end

#lookup_error(err) ⇒ ::Libuv::Error

Lookup an error code and return is as an error object

Parameters:

  • err (Integer)

    The error code to look up.

Returns:



201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/libuv/loop.rb', line 201

def lookup_error(err)
    name = ::Libuv::Ext.err_name(err)

    if name
        msg  = ::Libuv::Ext.strerror(err)
        ::Libuv::Error.const_get(name.to_sym).new(msg)
    else
        # We want a back-trace in this case
        raise "error lookup failed for code #{err}"
    end
rescue Exception => e
    @loop.log :warn, :error_lookup_failed, e
    e
end

#next_tick(callback = nil, &block) ⇒ Object

Queue some work to be processed in the next iteration of the event loop (thread safe)

Parameters:

  • callback (Proc) (defaults to: nil)

    the callback to be called on the reactor thread

Raises:

  • (ArgumentError)

    if block is not given



373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
# File 'lib/libuv/loop.rb', line 373

def next_tick(callback = nil, &block)
    callback ||= block
    assert_block(callback)

    @run_queue << callback
    if reactor_thread?
        # Create a next tick timer
        if not @next_tick_scheduled
            @next_tick.start(0)
            @next_tick_scheduled = true
        end
    else
        @process_queue.call
    end
end

#notifier::Libuv::Q::Promise

Provides a promise notifier for receiving un-handled exceptions

Returns:



137
138
139
# File 'lib/libuv/loop.rb', line 137

def notifier
    @loop_notify.promise
end

#nowFixnum

Get current time in milliseconds

Returns:

  • (Fixnum)


193
194
195
# File 'lib/libuv/loop.rb', line 193

def now
    ::Libuv::Ext.now(@pointer)
end

#pipe(ipc = false) ⇒ ::Libuv::Pipe

Get a new Pipe instance

Parameters:

  • ipc (true, false) (defaults to: false)

    indicate if a handle will be used for ipc, useful for sharing tcp socket between processes

Returns:



245
246
247
# File 'lib/libuv/loop.rb', line 245

def pipe(ipc = false)
    Pipe.new(@loop, ipc)
end

#prepare(callback = nil, &blk) ⇒ ::Libuv::Prepare

Get a new Prepare handle

Returns:



260
261
262
# File 'lib/libuv/loop.rb', line 260

def prepare(callback = nil, &blk)
    Prepare.new(@loop, callback || blk)
end

#reactor_running?Boolean

Tells you whether the Libuv reactor loop is currently running.

Returns:

  • (Boolean)


420
421
422
# File 'lib/libuv/loop.rb', line 420

def reactor_running?
    !@reactor_thread.nil?
end

#reactor_threadThread

Exposed to allow joining on the thread, when run in a multithreaded environment. Performing other actions on the thread has undefined semantics (read: a dangerous endevor).

Returns:

  • (Thread)


413
414
415
# File 'lib/libuv/loop.rb', line 413

def reactor_thread
    @reactor_thread
end

#reactor_thread?Boolean

True if the calling thread is the same thread as the reactor.

Returns:

  • (Boolean)


406
407
408
# File 'lib/libuv/loop.rb', line 406

def reactor_thread?
    @reactor_thread == Thread.current
end

#run(run_type = :UV_RUN_DEFAULT) {|promise| ... } ⇒ Object

Run the actual event loop. This method will block until the loop is stopped.

Parameters:

  • run_type (:UV_RUN_DEFAULT, :UV_RUN_ONCE, :UV_RUN_NOWAIT) (defaults to: :UV_RUN_DEFAULT)

Yield Parameters:

  • promise (::Libuv::Q::Promise)

    Yields a promise that can be used for logging unhandled exceptions on the loop.



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/libuv/loop.rb', line 114

def run(run_type = :UV_RUN_DEFAULT)
    if @reactor_thread.nil?
        @loop_notify = @loop.defer

        begin
            @reactor_thread = Thread.current
            LOOPS[@reactor_thread] = @loop
            yield  @loop_notify.promise if block_given?
            ::Libuv::Ext.run(@pointer, run_type)  # This is blocking
        ensure
            @reactor_thread = nil
            @run_queue.clear
        end
    elsif block_given?
        schedule { yield  @loop_notify.promise }
    end
    @loop
end

#schedule(callback = nil, &block) ⇒ Object

Schedule some work to be processed on the event loop as soon as possible (thread safe)

Parameters:

  • callback (Proc) (defaults to: nil)

    the callback to be called on the reactor thread

Raises:

  • (ArgumentError)

    if block is not given



357
358
359
360
361
362
363
364
365
366
367
# File 'lib/libuv/loop.rb', line 357

def schedule(callback = nil, &block)
    callback ||= block
    assert_block(callback)

    if reactor_thread?
        callback.call
    else
        @run_queue << callback
        @process_queue.call
    end
end

#signal(signum = nil, callback = nil, &block) ⇒ ::Libuv::Signal

Get a new signal handler

Returns:



292
293
294
295
296
297
298
# File 'lib/libuv/loop.rb', line 292

def signal(signum = nil, callback = nil, &block)
    callback ||= block
    handle = Signal.new(@loop)
    handle.progress callback if callback
    handle.start(signum) if signum
    handle
end

#stopObject

Closes handles opened by the loop class and completes the current loop iteration (thread safe)



399
400
401
# File 'lib/libuv/loop.rb', line 399

def stop
    @stop_loop.call
end

#tcp::Libuv::TCP

Get a new TCP instance

Returns:



219
220
221
# File 'lib/libuv/loop.rb', line 219

def tcp
    TCP.new(@loop)
end

#timer(callback = nil, &blk) ⇒ ::Libuv::Timer

Get a new timer instance

Parameters:

  • callback (Proc) (defaults to: nil)

    the callback to be called on timer trigger

Returns:



253
254
255
# File 'lib/libuv/loop.rb', line 253

def timer(callback = nil, &blk)
    Timer.new(@loop, callback || blk)
end

#tty(fileno, readable = false) ⇒ ::Libuv::TTY

Get a new TTY instance

Parameters:

  • fileno (Integer)

    Integer file descriptor of a tty device

  • readable (true, false) (defaults to: false)

    Boolean indicating if TTY is readable

Returns:



235
236
237
238
239
# File 'lib/libuv/loop.rb', line 235

def tty(fileno, readable = false)
    assert_type(Integer, fileno, "io#fileno must return an integer file descriptor, #{fileno.inspect} given")

    TTY.new(@loop, fileno, readable)
end

#udp::Libuv::UDP

Get a new UDP instance

Returns:



226
227
228
# File 'lib/libuv/loop.rb', line 226

def udp
    UDP.new(@loop)
end

#update_timeObject

forces loop time update, useful for getting more granular times

Returns:

  • nil



186
187
188
# File 'lib/libuv/loop.rb', line 186

def update_time
    ::Libuv::Ext.update_time(@pointer)
end

#work(callback = nil, &block) ⇒ ::Libuv::Work

Queue some work for processing in the libuv thread pool

Parameters:

  • callback (Proc) (defaults to: nil)

    the callback to be called in the thread pool

Returns:

Raises:

  • (ArgumentError)

    if block is not given



305
306
307
308
309
# File 'lib/libuv/loop.rb', line 305

def work(callback = nil, &block)
    callback ||= block
    assert_block(callback)
    Work.new(@loop, callback)    # Work is a promise object
end