Class: Libuv::Loop

Inherits:
Object
  • Object
show all
Extended by:
ClassMethods
Includes:
Assertions, Resource
Defined in:
lib/libuv/loop.rb

Defined Under Namespace

Modules: ClassMethods

Constant Summary collapse

LOOPS =
ThreadSafe::Cache.new

Constants included from Assertions

Assertions::MSG_NO_PROC

Instance Method Summary collapse

Methods included from ClassMethods

create, current, default, new

Methods included from Resource

#check_result, #check_result!, #resolve, #to_ptr

Methods included from Assertions

#assert_block, #assert_boolean, #assert_type

Constructor Details

#initialize(pointer) ⇒ Loop

Initialize a loop using an FFI::Pointer to a libuv loop



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/libuv/loop.rb', line 44

def initialize(pointer) # :notnew:
    @pointer = pointer
    @loop = self

    # Create an async call for scheduling work from other threads
    @run_queue = Queue.new
    @queue_proc = proc do
        # ensure we only execute what was required for this tick
        length = @run_queue.length
        length.times do
            begin
                run = @run_queue.pop true  # pop non-block
                run.call
            rescue Exception => e
                @loop.log :error, :next_tick_cb, e
            end
        end
    end
    @process_queue = Async.new(@loop, @queue_proc)

    # Create a next tick timer
    @next_tick = @loop.timer do
        @next_tick_scheduled = false
        @queue_proc.call
    end

    # Create an async call for ending the loop
    @stop_loop = Async.new @loop do
        LOOPS.delete(@reactor_thread)
        @reactor_thread = nil
        @process_queue.close
        @stop_loop.close
        @next_tick.close

        ::Libuv::Ext.stop(@pointer)
    end
end

Instance Method Details

#all(*promises) ⇒ ::Libuv::Q::Promise

Combines multiple promises into a single promise that is resolved when all of the input promises are resolved. (thread safe)

Parameters:

  • *promises (::Libuv::Q::Promise)

    a number of promises that will be combined into a single promise

Returns:

  • (::Libuv::Q::Promise)

    Returns a single promise that will be resolved with an array of values, each value corresponding to the promise at the same index in the ‘promises` array. If any of the promises is resolved with a rejection, this resulting promise will be resolved with the same rejection.



125
126
127
# File 'lib/libuv/loop.rb', line 125

def all(*promises)
    Q.all(@loop, *promises)
end

#any(*promises) ⇒ ::Libuv::Q::Promise

Combines multiple promises into a single promise that is resolved when any of the input promises are resolved.

Parameters:

  • *promises (::Libuv::Q::Promise)

    a number of promises that will be combined into a single promise

Returns:



135
136
137
# File 'lib/libuv/loop.rb', line 135

def any(*promises)
    Q.any(@loop, *promises)
end

#async(callback = nil, &block) ⇒ ::Libuv::Async

Get a new Async handle

Returns:



245
246
247
248
249
250
# File 'lib/libuv/loop.rb', line 245

def async(callback = nil, &block)
    callback ||= block
    handle = Async.new(@loop)
    handle.progress callback if callback
    handle
end

#check(callback = nil, &blk) ⇒ ::Libuv::Check

Get a new Check handle

Returns:



230
231
232
# File 'lib/libuv/loop.rb', line 230

def check(callback = nil, &blk)
    Check.new(@loop, callback || blk)
end

#defer::Libuv::Q::Deferred

Creates a deferred result object for where the result of an operation may only be returned at some point in the future or is being processed on a different thread (thread safe)



113
114
115
# File 'lib/libuv/loop.rb', line 113

def defer
    Q.defer(@loop)
end

#file(path, flags = 0, mode = 0) ⇒ ::Libuv::File

Opens a file and returns an object that can be used to manipulate it

Parameters:

  • path (String)

    the path to the file or folder for watching

  • flags (Integer) (defaults to: 0)

    see ruby File::Constants

  • mode (Integer) (defaults to: 0)

Returns:



302
303
304
305
306
307
# File 'lib/libuv/loop.rb', line 302

def file(path, flags = 0, mode = 0)
    assert_type(String, path, "path must be a String")
    assert_type(Integer, flags, "flags must be an Integer")
    assert_type(Integer, mode, "mode must be an Integer")
    File.new(@loop, path, flags, mode)
end

#filesystem::Libuv::Filesystem

Returns an object for manipulating the filesystem

Returns:



312
313
314
# File 'lib/libuv/loop.rb', line 312

def filesystem
    Filesystem.new(@loop)
end

#finally(*promises) ⇒ ::Libuv::Q::Promise

Combines multiple promises into a single promise that is resolved when all of the input promises are resolved or rejected.

Parameters:

  • *promises (::Libuv::Q::Promise)

    a number of promises that will be combined into a single promise

Returns:

  • (::Libuv::Q::Promise)

    Returns a single promise that will be resolved with an array of values, each [result, wasResolved] value pair corresponding to a at the same index in the ‘promises` array.



146
147
148
# File 'lib/libuv/loop.rb', line 146

def finally(*promises)
    Q.finally(@loop, *promises)
end

#fs_event(path) ⇒ ::Libuv::FSEvent

Get a new FSEvent instance

Parameters:

  • path (String)

    the path to the file or folder for watching

Returns:

Raises:

  • (ArgumentError)

    if path is not a string



291
292
293
294
# File 'lib/libuv/loop.rb', line 291

def fs_event(path)
    assert_type(String, path)
    FSEvent.new(@loop, path)
end

#handleObject



82
# File 'lib/libuv/loop.rb', line 82

def handle; @pointer; end

#idle(callback = nil, &block) ⇒ ::Libuv::Idle

Get a new Idle handle

Parameters:

  • callback (Proc) (defaults to: nil)

    the callback to be called on idle trigger

Returns:



238
239
240
# File 'lib/libuv/loop.rb', line 238

def idle(callback = nil, &block)
    Idle.new(@loop, callback || block)
end

#log(level, id, *args) ⇒ Object

Notifies the loop there was an event that should be logged

Parameters:

  • level (Symbol)

    the error level (info, warn, error etc)

  • id (Object)

    some kind of identifying information

  • *args (*args)

    any additional information



357
358
359
# File 'lib/libuv/loop.rb', line 357

def log(level, id, *args)
    @loop_notify.notify(level, id, *args)
end

#lookup(hostname, hint = :IPv4, port = 9, &block) ⇒ ::Libuv::Dns

Lookup a hostname

Parameters:

  • hostname (String)

    the domain name to lookup

  • port (Integer, String) (defaults to: 9)

    the service being connected too

  • callback (Proc)

    the callback to be called on success

Returns:



280
281
282
283
284
# File 'lib/libuv/loop.rb', line 280

def lookup(hostname, hint = :IPv4, port = 9, &block)
    dns = Dns.new(@loop, hostname, port, hint)    # Work is a promise object
    dns.then block if block_given?
    dns
end

#lookup_error(err) ⇒ ::Libuv::Error

Lookup an error code and return is as an error object

Parameters:

  • err (Integer)

    The error code to look up.

Returns:



169
170
171
172
173
174
175
176
177
# File 'lib/libuv/loop.rb', line 169

def lookup_error(err)
    name = ::Libuv::Ext.err_name(err)
    msg  = ::Libuv::Ext.strerror(err)

    ::Libuv::Error.const_get(name.to_sym).new(msg)
rescue Exception => e
    @loop.log :warn, :error_lookup_failed, e
    ::Libuv::Error::UNKNOWN.new("error lookup failed for code #{err} #{name} #{msg}")
end

#next_tick(callback = nil, &block) ⇒ Object

Queue some work to be processed in the next iteration of the event loop (thread safe)

Parameters:

  • callback (Proc) (defaults to: nil)

    the callback to be called on the reactor thread

Raises:

  • (ArgumentError)

    if block is not given



336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
# File 'lib/libuv/loop.rb', line 336

def next_tick(callback = nil, &block)
    callback ||= block
    assert_block(callback)

    @run_queue << callback
    if reactor_thread?
        # Create a next tick timer
        if not @next_tick_scheduled
            @next_tick.start(0)
            @next_tick_scheduled = true
        end
    else
        @process_queue.call
    end
end

#nowFixnum

Get current time in milliseconds

Returns:

  • (Fixnum)


161
162
163
# File 'lib/libuv/loop.rb', line 161

def now
    ::Libuv::Ext.now(@pointer)
end

#pipe(ipc = false) ⇒ ::Libuv::Pipe

Get a new Pipe instance

Parameters:

  • ipc (true, false) (defaults to: false)

    indicate if a handle will be used for ipc, useful for sharing tcp socket between processes

Returns:



208
209
210
# File 'lib/libuv/loop.rb', line 208

def pipe(ipc = false)
    Pipe.new(@loop, ipc)
end

#prepare(callback = nil, &blk) ⇒ ::Libuv::Prepare

Get a new Prepare handle

Returns:



223
224
225
# File 'lib/libuv/loop.rb', line 223

def prepare(callback = nil, &blk)
    Prepare.new(@loop, callback || blk)
end

#reactor_running?Boolean

Tells you whether the Libuv reactor loop is currently running.

Returns:

  • (Boolean)


383
384
385
# File 'lib/libuv/loop.rb', line 383

def reactor_running?
    !@reactor_thread.nil?
end

#reactor_threadThread

Exposed to allow joining on the thread, when run in a multithreaded environment. Performing other actions on the thread has undefined semantics (read: a dangerous endevor).

Returns:

  • (Thread)


376
377
378
# File 'lib/libuv/loop.rb', line 376

def reactor_thread
    @reactor_thread
end

#reactor_thread?Boolean

True if the calling thread is the same thread as the reactor.

Returns:

  • (Boolean)


369
370
371
# File 'lib/libuv/loop.rb', line 369

def reactor_thread?
    @reactor_thread == Thread.current
end

#run(run_type = :UV_RUN_DEFAULT) {|promise| ... } ⇒ Object

Run the actual event loop. This method will block until the loop is stopped.

Parameters:

  • run_type (:UV_RUN_DEFAULT, :UV_RUN_ONCE, :UV_RUN_NOWAIT) (defaults to: :UV_RUN_DEFAULT)

Yield Parameters:

  • promise (::Libuv::Q::Promise)

    Yields a promise that can be used for logging unhandled exceptions on the loop.



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/libuv/loop.rb', line 89

def run(run_type = :UV_RUN_DEFAULT)
    if @reactor_thread.nil?
        @loop_notify = @loop.defer

        begin
            @reactor_thread = Thread.current
            LOOPS[@reactor_thread] = @loop
            yield  @loop_notify.promise if block_given?
            ::Libuv::Ext.run(@pointer, run_type)  # This is blocking
        ensure
            @reactor_thread = nil
            @run_queue.clear
        end
    elsif block_given?
        schedule { yield  @loop_notify.promise }
    end
    @loop
end

#schedule(callback = nil, &block) ⇒ Object

Schedule some work to be processed on the event loop as soon as possible (thread safe)

Parameters:

  • callback (Proc) (defaults to: nil)

    the callback to be called on the reactor thread

Raises:

  • (ArgumentError)

    if block is not given



320
321
322
323
324
325
326
327
328
329
330
# File 'lib/libuv/loop.rb', line 320

def schedule(callback = nil, &block)
    callback ||= block
    assert_block(callback)

    if reactor_thread?
        block.call
    else
        @run_queue << callback
        @process_queue.call
    end
end

#signal(signum = nil, callback = nil, &block) ⇒ ::Libuv::Signal

Get a new signal handler

Returns:



255
256
257
258
259
260
261
# File 'lib/libuv/loop.rb', line 255

def signal(signum = nil, callback = nil, &block)
    callback ||= block
    handle = Signal.new(@loop)
    handle.progress callback if callback
    handle.start(signum) if signum
    handle
end

#stopObject

Closes handles opened by the loop class and completes the current loop iteration (thread safe)



362
363
364
# File 'lib/libuv/loop.rb', line 362

def stop
    @stop_loop.call
end

#tcp::Libuv::TCP

Get a new TCP instance

Returns:



182
183
184
# File 'lib/libuv/loop.rb', line 182

def tcp
    TCP.new(@loop)
end

#timer(callback = nil, &blk) ⇒ ::Libuv::Timer

Get a new timer instance

Parameters:

  • callback (Proc) (defaults to: nil)

    the callback to be called on timer trigger

Returns:



216
217
218
# File 'lib/libuv/loop.rb', line 216

def timer(callback = nil, &blk)
    Timer.new(@loop, callback || blk)
end

#tty(fileno, readable = false) ⇒ ::Libuv::TTY

Get a new TTY instance

Parameters:

  • fileno (Integer)

    Integer file descriptor of a tty device

  • readable (true, false) (defaults to: false)

    Boolean indicating if TTY is readable

Returns:



198
199
200
201
202
# File 'lib/libuv/loop.rb', line 198

def tty(fileno, readable = false)
    assert_type(Integer, fileno, "io#fileno must return an integer file descriptor, #{fileno.inspect} given")

    TTY.new(@loop, fileno, readable)
end

#udp::Libuv::UDP

Get a new UDP instance

Returns:



189
190
191
# File 'lib/libuv/loop.rb', line 189

def udp
    UDP.new(@loop)
end

#update_timeObject

forces loop time update, useful for getting more granular times

Returns:

  • nil



154
155
156
# File 'lib/libuv/loop.rb', line 154

def update_time
    ::Libuv::Ext.update_time(@pointer)
end

#work(callback = nil, &block) ⇒ ::Libuv::Work

Queue some work for processing in the libuv thread pool

Parameters:

  • callback (Proc) (defaults to: nil)

    the callback to be called in the thread pool

Returns:

Raises:

  • (ArgumentError)

    if block is not given



268
269
270
271
272
# File 'lib/libuv/loop.rb', line 268

def work(callback = nil, &block)
    callback ||= block
    assert_block(callback)
    Work.new(@loop, callback)    # Work is a promise object
end