Class: ZeevexConcurrency::EventLoop

Inherits:
Object
  • Object
show all
Defined in:
lib/zeevex_concurrency/event_loop.rb

Direct Known Subclasses

Inline

Defined Under Namespace

Classes: Inline, Null

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ EventLoop

Returns a new instance of EventLoop.



6
7
8
9
10
11
# File 'lib/zeevex_concurrency/event_loop.rb', line 6

def initialize(options = {})
  @options = options
  @mutex   = options.delete(:mutex) || Mutex.new
  @queue   = options.delete(:queue) || Queue.new
  @state   = :stopped
end

Instance Method Details

#<<(callable) ⇒ Object



54
55
56
# File 'lib/zeevex_concurrency/event_loop.rb', line 54

def <<(callable)
  enqueue(callable)
end

#backlogObject



62
63
64
# File 'lib/zeevex_concurrency/event_loop.rb', line 62

def backlog
  @queue.size
end

#enqueue(callable = nil, &block) ⇒ Object

Enqueue any callable object (including a Promise or Future or other Delayed class) to the event loop and return a Delayed object which can be used to fetch the return value.

Strictly obeys ordering.

Raises:

  • (ArgumentError)


45
46
47
48
49
50
51
52
# File 'lib/zeevex_concurrency/event_loop.rb', line 45

def enqueue(callable = nil, &block)
  to_run = callable || block
  raise ArgumentError, "Must provide proc or block arg" unless to_run

  to_run = ZeevexConcurrency::Promise.new(to_run) unless to_run.is_a?(ZeevexConcurrency::Delayed)
  @queue << to_run
  to_run
end

#flushObject



58
59
60
# File 'lib/zeevex_concurrency/event_loop.rb', line 58

def flush
  @queue.clear
end

#in_event_loop?Boolean

Returns true if the method was called from code executing on the event loop’s thread

Returns:

  • (Boolean)


75
76
77
# File 'lib/zeevex_concurrency/event_loop.rb', line 75

def in_event_loop?
  Thread.current.object_id == @thread.object_id
end

#on_event_loop(runnable = nil, &block) ⇒ Object

Runs a computation on the event loop. Does not deadlock if currently on the event loop, but will not preserve ordering either - it runs the computation immediately despite other events in the queue



84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/zeevex_concurrency/event_loop.rb', line 84

def on_event_loop(runnable = nil, &block)
  return unless runnable || block_given?
  promise = (runnable && runnable.is_a?(ZeevexConcurrency::Delayed)) ?
             runnable :
             ZeevexConcurrency::Promise.create(runnable, &block)
  if in_event_loop?
    promise.call
    promise
  else
    enqueue promise, &block
  end
end

#resetObject



66
67
68
69
70
# File 'lib/zeevex_concurrency/event_loop.rb', line 66

def reset
  stop
  flush
  start
end

#run_and_wait(runnable = nil, &block) ⇒ Object

Returns the value from the computation rather than a Promise. Has similar semantics to ‘on_event_loop` - if this is called from the event loop, it just executes the computation synchronously ahead of any other queued computations



102
103
104
105
# File 'lib/zeevex_concurrency/event_loop.rb', line 102

def run_and_wait(runnable = nil, &block)
  promise = on_event_loop(runnable, &block)
  promise.value
end

#running?Boolean

Returns:

  • (Boolean)


13
14
15
# File 'lib/zeevex_concurrency/event_loop.rb', line 13

def running?
  @state == :started
end

#startObject



17
18
19
20
21
22
23
24
25
# File 'lib/zeevex_concurrency/event_loop.rb', line 17

def start
  return unless @state == :stopped
  @stop_requested = false
  @thread = Thread.new do
    process
  end

  @state = :started
end

#stopObject



27
28
29
30
31
32
33
34
35
36
37
# File 'lib/zeevex_concurrency/event_loop.rb', line 27

def stop
  return unless @state == :started
  enqueue { @stop_requested = true }
  unless @thread.join(1)
    @thread.kill
    @thread.join(0)
  end

  @thread = nil
  @state = :stopped
end