Class: Fiber

Inherits:
Object show all
Defined in:
lib/polyphony/extensions/fiber.rb

Overview

Fiber extensions

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#oobObject

Returns the value of attribute oob.



7
8
9
# File 'lib/polyphony/extensions/fiber.rb', line 7

def oob
  @oob
end

#parentObject

Returns the value of attribute parent.



7
8
9
# File 'lib/polyphony/extensions/fiber.rb', line 7

def parent
  @parent
end

#resultObject (readonly)

Returns the value of attribute result.



8
9
10
# File 'lib/polyphony/extensions/fiber.rb', line 8

def result
  @result
end

#tagObject

Returns the value of attribute tag.



7
8
9
# File 'lib/polyphony/extensions/fiber.rb', line 7

def tag
  @tag
end

#threadObject

Returns the value of attribute thread.



7
8
9
# File 'lib/polyphony/extensions/fiber.rb', line 7

def thread
  @thread
end

Class Method Details

.Fiber.await(f1, f2, ...) ⇒ Array<any> .Fiber.await(fibers) ⇒ Array<any> Also known as: join

Waits for all given fibers to terminate, then returns the respective return values for all terminated fibers. If any of the awaited fibers terminates with an uncaught exception, Fiber.await will await all the other fibers to terminate, then reraise the exception.

This method can be called with multiple fibers as multiple arguments, or with a single array containing one or more fibers.

Overloads:

  • .Fiber.await(f1, f2, ...) ⇒ Array<any>

    Return values of given fibers

    Parameters:

    • fibers (Array<Fiber>)

      fibers to wait for

    Returns:

    • (Array<any>)

      return values of given fibers

  • .Fiber.await(fibers) ⇒ Array<any>

    Return values of given fibers

    Parameters:

    • fibers (Array<Fiber>)

      fibers to wait for

    Returns:

    • (Array<any>)

      return values of given fibers



557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
# File 'lib/polyphony/extensions/fiber.rb', line 557

def await(*fibers)
  return [] if fibers.empty?

  if (first = fibers.first).is_a?(Array)
    fibers = first
  end

  current_fiber = Fiber.current
  mailbox = current_fiber.monitor_mailbox
  results = {}
  fibers.each do |f|
    results[f] = nil
    if f.dead?
      # fiber already terminated, so queue message
      mailbox << [f, f.result]
    else
      f.monitor(current_fiber)
    end
  end
  exception = nil
  while !fibers.empty?
    (fiber, result) = mailbox.shift
    next unless fibers.include?(fiber)

    fibers.delete(fiber)
    current_fiber.remove_child(fiber) if fiber.parent == current_fiber
    if result.is_a?(Exception)
      exception ||= result
      fibers.each(&:terminate)
    else
      results[fiber] = result
    end
  end
  raise exception if exception

  results.values
end

.schedule_priority_oob_fiber(&block) ⇒ Object

Creates and schedules with priority an out-of-band fiber that runs the supplied block. If any uncaught exception is raised while the fiber is running, it will bubble up to the main thread's main fiber, which will also be scheduled with priority. This method is mainly used trapping signals (see also the patched Kernel#trap)



631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
# File 'lib/polyphony/extensions/fiber.rb', line 631

def schedule_priority_oob_fiber(&block)
  oob_fiber = Fiber.new do
    Fiber.current.setup_raw
    Thread.backend.trace(:unblock, oob_fiber, nil, @caller)
    result = block.call
  rescue Exception => e
    Thread.current.schedule_and_wakeup(Thread.main.main_fiber, e)
    result = e
  ensure
    Thread.backend.trace(:terminate, Fiber.current, result)
    suspend
  end
  prepare_oob_fiber(oob_fiber, block)
  Thread.backend.trace(:spin, oob_fiber, caller)
  oob_fiber.schedule_with_priority(nil)
end

.select(*fibers) ⇒ Array

Waits for at least one of the given fibers to terminate, returning an array containing the first terminated fiber and its return value. If an exception occurs in one of the given fibers, it will be reraised.

Parameters:

  • fibers (Array<Fiber>)

    Fibers to wait for

Returns:

  • (Array)

    Array containing the first terminated fiber and its return value



602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
# File 'lib/polyphony/extensions/fiber.rb', line 602

def select(*fibers)
  return nil if fibers.empty?

  current_fiber = Fiber.current
  mailbox = current_fiber.monitor_mailbox
  fibers.each do |f|
    if f.dead?
      result = f.result
      result.is_a?(Exception) ? (raise result) : (return [f, result])
    end
  end

  fibers.each { |f| f.monitor(current_fiber) }
  while true
    (fiber, result) = mailbox.shift
    next unless fibers.include?(fiber)

    fibers.each { |f| f.unmonitor(current_fiber) }
    raise result if result.is_a?(Exception)

    return [fiber, result]
  end
end

Instance Method Details

#<<(msg) ⇒ Fiber

Sends a message to the given fiber. The message will be added to the fiber's mailbox.

Parameters:

  • msg (any)

Returns:



122
123
124
125
126
127
128
129
130
# File 'ext/polyphony/fiber.c', line 122

VALUE Fiber_send(VALUE self, VALUE msg) {
  VALUE mailbox = rb_ivar_get(self, ID_ivar_mailbox);
  if (mailbox == Qnil) {
    mailbox = rb_funcall(cQueue, ID_new, 0);
    rb_ivar_set(self, ID_ivar_mailbox, mailbox);
  }
  Queue_push(mailbox, msg);
  return self;
}

#add_child(child_fiber) ⇒ Fiber

Adds a child fiber reference. Used internally.

Parameters:

  • child_fiber (Fiber)

    child fiber

Returns:



366
367
368
369
370
# File 'lib/polyphony/extensions/fiber.rb', line 366

def add_child(child_fiber)
  (@children ||= {})[child_fiber] = true
  child_fiber.monitor(self) if @supervise_mode
  self
end

#attach_all_children_to(parent) ⇒ Fiber

Attaches all child fibers to a new parent.

Parameters:

  • parent (Fiber)

    new parent

Returns:



322
323
324
325
326
# File 'lib/polyphony/extensions/fiber.rb', line 322

def attach_all_children_to(parent)
  child_fibers = @children&.keys
  child_fibers&.each { |c| c.attach_to(parent) }
  self
end

#attach_and_monitor(parent) ⇒ Fiber

Attaches the fiber to the new parent and monitors the new parent.

Parameters:

  • parent (Fiber)

    new parent

Returns:



354
355
356
357
358
359
360
# File 'lib/polyphony/extensions/fiber.rb', line 354

def attach_and_monitor(parent)
  @parent.remove_child(self)
  @parent = parent
  parent.add_child(self)
  monitor(parent)
  self
end

#attach_to(parent) ⇒ Fiber

Attaches the fiber to a new parent.

Parameters:

  • parent (Fiber)

    new parent

Returns:



343
344
345
346
347
348
# File 'lib/polyphony/extensions/fiber.rb', line 343

def attach_to(parent)
  @parent.remove_child(self)
  @parent = parent
  parent.add_child(self)
  self
end

#awaitany #joinany #valueany

Waits for the fiber to terminate, and returns its return value (the result of its last statement). If the fiber has terminated with an ancaught exception, the exception will be raised.

f = spin { :foo; :bar }
f.await #=> :bar

Overloads:

  • #awaitany

    Returns fiber's return value.

    Returns:

    • (any)

      fiber's return value

  • #joinany

    Returns fiber's return value.

    Returns:

    • (any)

      fiber's return value

  • #valueany

    Returns fiber's return value.

    Returns:

    • (any)

      fiber's return value



198
199
200
# File 'lib/polyphony/extensions/fiber.rb', line 198

def await
  Fiber.await(self).first
end

#await_all_childrenArray<any>

Block until all child fibers have terminated. Returns the return values for all child fibers.

Returns:

  • (Array<any>)

    return values of child fibers



294
295
296
297
298
# File 'lib/polyphony/extensions/fiber.rb', line 294

def await_all_children
  return unless @children && !@children.empty?

  Fiber.await(@children.keys.reject(&:dead?))
end

#callerArray<String>

Returns the fiber's caller.

Returns:

  • (Array<String>)

    caller



43
44
45
46
47
48
49
50
# File 'lib/polyphony/extensions/fiber.rb', line 43

def caller
  spin_caller = @caller || []
  if @parent
    spin_caller + @parent.caller
  else
    spin_caller
  end
end

#cancel(exception = Polyphony::Cancel) ⇒ Fiber

Stops a fiber by raising a Polyphony::Cancel exception.

Parameters:

  • exception (Class, Exception) (defaults to: Polyphony::Cancel)

    exception or exception class

Returns:



119
120
121
122
123
124
125
# File 'lib/polyphony/extensions/fiber.rb', line 119

def cancel(exception = Polyphony::Cancel)
  return if @running == false

  value = exception.is_a?(Class) ? exception.new : exception
  schedule value
  self
end

#childrenArray<Fiber>

Returns the fiber's children.

Returns:

  • (Array<Fiber>)

    child fibers



255
256
257
# File 'lib/polyphony/extensions/fiber.rb', line 255

def children
  (@children ||= {}).keys
end

#dead?bool

Returns true if the fiber is dead.

Returns:

  • (bool)

    is fiber dead



538
539
540
# File 'lib/polyphony/extensions/fiber.rb', line 538

def dead?
  state == :dead
end

#detachFiber

Detaches the fiber from its current parent. The fiber will be made a child of the main fiber (for the current thread.)

Returns:



332
333
334
335
336
337
# File 'lib/polyphony/extensions/fiber.rb', line 332

def detach
  @parent.remove_child(self)
  @parent = @thread.main_fiber
  @parent.add_child(self)
  self
end

#finalize(result, uncaught_exception: false) ⇒ false

Finalizes the fiber, handling its return value or any uncaught exception.

Parameters:

  • result (any)

    return value

  • uncaught_exception (Exception, nil) (defaults to: false)

    uncaught exception

Returns:

  • (false)


462
463
464
465
466
467
468
469
470
471
472
473
474
# File 'lib/polyphony/extensions/fiber.rb', line 462

def finalize(result, uncaught_exception: false)
  result, uncaught_exception = finalize_children(result, uncaught_exception)
  Thread.backend.trace(:terminate, self, result)
  @result = result

  inform_monitors(result, uncaught_exception)
  @running = false
ensure
  @parent&.remove_child(self)
  # Prevent fiber from being resumed after terminating
  @thread.fiber_unschedule(self)
  Thread.current.switch_fiber
end

#finalize_children(result, uncaught_exception) ⇒ Array

Shuts down all children of the current fiber. If any exception occurs while the children are shut down, it is returned along with the uncaught_exception flag set. Otherwise, it returns the given arguments.

Parameters:

  • result (any)

    fiber's return value

  • uncaught_exception (Exception, nil)

    uncaught exception

Returns:

  • (Array)

    array containing result and uncaught exception if any



483
484
485
486
487
488
# File 'lib/polyphony/extensions/fiber.rb', line 483

def finalize_children(result, uncaught_exception)
  shutdown_all_children(graceful: graceful_shutdown?)
  [result, uncaught_exception]
rescue Exception => e
  [e, true]
end

#graceful_shutdown=(graceful) ⇒ bool

Sets the graceful shutdown flag for the fiber.

Parameters:

  • graceful (bool)

    Whether or not to perform a graceful shutdown

Returns:

  • (bool)

    graceful



131
132
133
# File 'lib/polyphony/extensions/fiber.rb', line 131

def graceful_shutdown=(graceful)
  @graceful_shutdown = graceful
end

#graceful_shutdown?bool

Returns the graceful shutdown flag for the fiber.

Returns:

  • (bool)

    true if graceful shutdown, otherwise false



138
139
140
# File 'lib/polyphony/extensions/fiber.rb', line 138

def graceful_shutdown?
  @graceful_shutdown
end

#inform_monitors(result, uncaught_exception) ⇒ Fiber

Informs the fiber's monitors it is terminated.

Parameters:

  • result (any)

    fiber's return value

  • uncaught_exception (Exception, nil)

    uncaught exception

Returns:



495
496
497
498
499
500
501
502
503
504
505
506
507
# File 'lib/polyphony/extensions/fiber.rb', line 495

def inform_monitors(result, uncaught_exception)
  if @monitors
    msg = [self, result]
    @monitors.each_key { |f| f.monitor_mailbox << msg }
  end

  if uncaught_exception && @parent
    parent_is_monitor = @monitors&.key?(@parent)
    @parent.schedule_with_priority(result) unless parent_is_monitor
  end

  self
end

#inspectString Also known as: to_s

Returns a string representation of the fiber for debugging.

Returns:

  • (String)

    string representation



20
21
22
23
24
25
26
# File 'lib/polyphony/extensions/fiber.rb', line 20

def inspect
  if @tag
    "#<Fiber #{tag}:#{object_id} #{location} (#{state})>"
  else
    "#<Fiber:#{object_id} #{location} (#{state})>"
  end
end

#interject(&block) ⇒ Fiber

Adds an interjection to the fiber. The current operation undertaken by the fiber will be interrupted, and the given block will be executed, and the operation will be resumed. This API is experimental and might be removed in the future.

Returns:

Raises:



181
182
183
# File 'lib/polyphony/extensions/fiber.rb', line 181

def interject(&block)
  raise Polyphony::Interjection.new(block)
end

#interrupt(value = nil) ⇒ Fiber Also known as: stop, kill, move_on

Stops the fiber by raising a Polyphony::MoveOn exception. The given value will become the fiber's return value.

Parameters:

  • value (any) (defaults to: nil)

    fiber's eventual return value

Returns:



85
86
87
88
89
90
# File 'lib/polyphony/extensions/fiber.rb', line 85

def interrupt(value = nil)
  return if @running == false

  schedule Polyphony::MoveOn.new(value)
  self
end

#awaitany #joinany #valueany

Waits for the fiber to terminate, and returns its return value (the result of its last statement). If the fiber has terminated with an ancaught exception, the exception will be raised.

f = spin { :foo; :bar }
f.await #=> :bar

Overloads:

  • #awaitany

    Returns fiber's return value.

    Returns:

    • (any)

      fiber's return value

  • #joinany

    Returns fiber's return value.

    Returns:

    • (any)

      fiber's return value

  • #valueany

    Returns fiber's return value.

    Returns:

    • (any)

      fiber's return value



201
202
203
# File 'lib/polyphony/extensions/fiber.rb', line 201

def await
  Fiber.await(self).first
end

#locationString

Returns the source location for the fiber based on its caller.

Returns:

  • (String)

    source location



32
33
34
35
36
37
38
# File 'lib/polyphony/extensions/fiber.rb', line 32

def location
  if @oob
    "#{@caller[0]} (oob)"
  else
    @caller ? @caller[0] : '(root)'
  end
end

#mailboxQueue

Returns the fiber's mailbox.

Returns:

  • (Queue)


170
171
172
173
174
175
176
177
# File 'ext/polyphony/fiber.c', line 170

VALUE Fiber_mailbox(VALUE self) {
  VALUE mailbox = rb_ivar_get(self, ID_ivar_mailbox);
  if (mailbox == Qnil) {
    mailbox = rb_funcall(cQueue, ID_new, 0);
    rb_ivar_set(self, ID_ivar_mailbox, mailbox);
  }
  return mailbox;
}

#main?bool

Returns true if the fiber is the main fiber for its thread.

Returns:

  • (bool)

    is main fiber



64
65
66
# File 'lib/polyphony/extensions/fiber.rb', line 64

def main?
  @main
end

#monitor(fiber) ⇒ Fiber

Adds a fiber to the list of monitoring fibers. Monitoring fibers will be notified on their monitor mailboxes when the fiber is terminated.

Parameters:

  • fiber (Fiber)

    monitoring fiber

Returns:



514
515
516
517
# File 'lib/polyphony/extensions/fiber.rb', line 514

def monitor(fiber)
  (@monitors ||= {})[fiber] = true
  self
end

#monitor_mailboxPolyphony::Queue

Returns the fiber's monitoring mailbox queue, used for receiving fiber monitoring messages.

Returns:



76
77
78
# File 'lib/polyphony/extensions/fiber.rb', line 76

def monitor_mailbox
  @monitor_mailbox ||= Polyphony::Queue.new
end

#monitorsArray<Fiber>

Returns the list of monitoring fibers.

Returns:

  • (Array<Fiber>)

    monitoring fibers



531
532
533
# File 'lib/polyphony/extensions/fiber.rb', line 531

def monitors
  @monitors&.keys || []
end

#prepare(tag, block, caller, parent) ⇒ Fiber

Prepares a fiber for running.

Parameters:

  • tag (any)

    fiber's tag

  • block (Proc)

    fiber's block

  • caller (Array<String>)

    fiber's caller

  • parent (Fiber)

    fiber's parent

Returns:



392
393
394
395
396
397
398
399
400
401
# File 'lib/polyphony/extensions/fiber.rb', line 392

def prepare(tag, block, caller, parent)
  @thread = Thread.current
  @tag = tag
  @parent = parent
  @caller = caller
  @block = block
  Thread.backend.trace(:spin, self, Kernel.caller[1..])
  schedule
  self
end

#fiber.raise(message) ⇒ Fiber #fiber.raise(exception_class) ⇒ Fiber #fiber.raise(exception_class, exception_message) ⇒ Fiber #fiber.raise(exception) ⇒ Fiber

Raises an exception in the context of the fiber

Overloads:

  • #fiber.raise(message) ⇒ Fiber

    Returns self.

    Parameters:

    • message (String)

      error message

    Returns:

  • #fiber.raise(exception_class) ⇒ Fiber

    Returns self.

    Parameters:

    • exception_class (Class)

      exception class to raise

    Returns:

  • #fiber.raise(exception_class, exception_message) ⇒ Fiber

    Returns self.

    Parameters:

    • exception_class (Class)

      exception class to raise

    • exception_message (String)

      exception message to raise

    Returns:

  • #fiber.raise(exception) ⇒ Fiber

    Returns self.

    Parameters:

    Returns:



169
170
171
172
173
# File 'lib/polyphony/extensions/fiber.rb', line 169

def raise(*args)
  error = Exception.instantiate(*args)
  schedule(error)
  self
end

#receiveany

Receives a message from the fiber's mailbox. If no message is available, waits for a message to be sent to it.

Returns:

  • (any)

    received message



138
139
140
141
142
143
144
145
# File 'ext/polyphony/fiber.c', line 138

VALUE Fiber_receive(VALUE self) {
  VALUE mailbox = rb_ivar_get(self, ID_ivar_mailbox);
  if (mailbox == Qnil) {
    mailbox = rb_funcall(cQueue, ID_new, 0);
    rb_ivar_set(self, ID_ivar_mailbox, mailbox);
  }
  return Queue_shift(0, 0, mailbox);
}

#receive_all_pendingArray

Receives all messages currently in the fiber's mailbox.

Returns:

  • (Array)


184
185
186
187
# File 'ext/polyphony/fiber.c', line 184

VALUE Fiber_receive_all_pending(VALUE self) {
  VALUE mailbox = rb_ivar_get(self, ID_ivar_mailbox);
  return (mailbox == Qnil) ? rb_ary_new() : Queue_shift_all(mailbox);
}

#receive_loopObject

Receives messages from the fiber's mailbox in an infinite loop.



151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'ext/polyphony/fiber.c', line 151

noreturn VALUE Fiber_receive_loop(VALUE self) {
  VALUE mailbox = rb_ivar_get(self, ID_ivar_mailbox);
  if (mailbox == Qnil) {
    mailbox = rb_funcall(cQueue, ID_new, 0);
    rb_ivar_set(self, ID_ivar_mailbox, mailbox);
  }

  while (1) {
    VALUE msg = Queue_shift(0, 0,mailbox);
    rb_yield(msg);
    RB_GC_GUARD(msg);
  }
}

#remove_child(child_fiber) ⇒ Fiber

Removes a child fiber reference. Used internally.

Parameters:

  • child_fiber (Fiber)

    child fiber to be removed

Returns:



376
377
378
379
# File 'lib/polyphony/extensions/fiber.rb', line 376

def remove_child(child_fiber)
  @children&.delete(child_fiber)
  self
end

#restart(value = nil) ⇒ Fiber Also known as: reset

Restarts the fiber, with the given value serving as the first value passed to the fiber's block.

Parameters:

  • value (any) (defaults to: nil)

    value passed to fiber block

Returns:

  • (Fiber)

    restarted fiber



100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/polyphony/extensions/fiber.rb', line 100

def restart(value = nil)
  raise "Can't restart main fiber" if @main

  if @running
    schedule Polyphony::Restart.new(value)
    return self
  end

  fiber = parent.spin(@tag, @caller, &@block)
  @monitors&.each_key { |f| fiber.monitor(f) }
  fiber.schedule(value) unless value.nil?
  fiber
end

#restart_self(first_value) ⇒ any

Resets the fiber's state and reruns the fiber.

Parameters:

  • first_value (Fiber)

    first_value to pass to fiber after restarting

Returns:

  • (any)

    fiber result



452
453
454
455
# File 'lib/polyphony/extensions/fiber.rb', line 452

def restart_self(first_value)
  @mailbox = nil
  run(first_value)
end

#run(first_value) ⇒ any

Runs the fiber's block and handles uncaught exceptions.

Parameters:

  • first_value (any)

    value passed to fiber on first resume

Returns:

  • (any)

    fiber result



407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
# File 'lib/polyphony/extensions/fiber.rb', line 407

def run(first_value)
  Kernel.raise first_value if first_value.is_a?(Exception)
  @running = true

  Thread.backend.trace(:unblock, self, first_value, @caller)
  result = @block.(first_value)
  finalize(result)
  result
rescue Polyphony::Restart => e
  restart_self(e.value)
rescue Polyphony::MoveOn, Polyphony::Terminate => e
  finalize(e.value)
rescue Exception => e
  e.source_fiber = self
  finalize(e, uncaught_exception: true)
end

#running?bool

Returns true if fiber is running.

Returns:

  • (bool)

    is fiber running



13
14
15
# File 'lib/polyphony/extensions/fiber.rb', line 13

def running?
  @running
end

#schedule(value) ⇒ Fiber #scheduleFiber

Adds the fiber to the runqueue with the given resume value. If no resume value is given, the fiber will be resumed with nil.

Overloads:

  • #schedule(value) ⇒ Fiber

    Returns scheduled fiber.

    Parameters:

    • value (any)

      resume value

    Returns:

    • (Fiber)

      scheduled fiber

  • #scheduleFiber

    Returns scheduled fiber.

    Returns:

    • (Fiber)

      scheduled fiber



73
74
75
76
77
# File 'ext/polyphony/fiber.c', line 73

static VALUE Fiber_schedule(int argc, VALUE *argv, VALUE self) {
  VALUE value = (argc == 0) ? Qnil : argv[0];
  Fiber_make_runnable(self, value);
  return self;
}

#schedule_with_priority(value) ⇒ Fiber #schedule_with_priorityFiber

Adds the fiber to the head of the runqueue with the given resume value. If no resume value is given, the fiber will be resumed with nil.

Overloads:

  • #schedule_with_priority(value) ⇒ Fiber

    Returns scheduled fiber.

    Parameters:

    • value (any)

      resume value

    Returns:

    • (Fiber)

      scheduled fiber

  • #schedule_with_priorityFiber

    Returns scheduled fiber.

    Returns:

    • (Fiber)

      scheduled fiber



89
90
91
92
93
# File 'ext/polyphony/fiber.c', line 89

static VALUE Fiber_schedule_with_priority(int argc, VALUE *argv, VALUE self) {
  VALUE value = (argc == 0) ? Qnil : argv[0];
  Fiber_make_runnable_with_priority(self, value);
  return self;
}

#send(msg) ⇒ Fiber

Sends a message to the given fiber. The message will be added to the fiber's mailbox.

Parameters:

  • msg (any)

Returns:



122
123
124
125
126
127
128
129
130
# File 'ext/polyphony/fiber.c', line 122

VALUE Fiber_send(VALUE self, VALUE msg) {
  VALUE mailbox = rb_ivar_get(self, ID_ivar_mailbox);
  if (mailbox == Qnil) {
    mailbox = rb_funcall(cQueue, ID_new, 0);
    rb_ivar_set(self, ID_ivar_mailbox, mailbox);
  }
  Queue_push(mailbox, msg);
  return self;
}

#set_caller(caller) ⇒ Fiber

Sets the fiber's caller.

Parameters:

  • caller (Array<String>)

    new caller

Returns:



56
57
58
59
# File 'lib/polyphony/extensions/fiber.rb', line 56

def set_caller(caller)
  @caller = caller
  self
end

#setup_main_fiberFiber

Sets up the fiber as the main fiber for the current thread.

Returns:



439
440
441
442
443
444
445
446
# File 'lib/polyphony/extensions/fiber.rb', line 439

def setup_main_fiber
  @main = true
  @tag = :main
  @thread = Thread.current
  @running = true
  @children&.clear
  self
end

#setup_rawFiber

Performs setup for a "raw" Fiber created using Fiber.new. Note that this fiber is an orphan fiber (has no parent), since we cannot control how the fiber terminates after it has already been created. Calling #setup_raw allows the fiber to be scheduled and to receive messages.

Returns:



430
431
432
433
434
# File 'lib/polyphony/extensions/fiber.rb', line 430

def setup_raw
  @thread = Thread.current
  @running = true
  self
end

#shutdown_all_children(graceful: false) ⇒ Fiber

Terminates and blocks until all child fibers have terminated.

Returns:



303
304
305
306
307
308
309
310
311
312
313
314
315
316
# File 'lib/polyphony/extensions/fiber.rb', line 303

def shutdown_all_children(graceful: false)
  return self unless @children

  pending = []
  child_fibers = @children.keys
  child_fibers.each do |c|
    next if c.dead?

    c.terminate(graceful:)
    pending << c
  end
  Fiber.await(pending)
  self
end

#spin(tag = nil, orig_caller = Kernel.caller, &block) ⇒ Fiber

Creates a new child fiber.

child = fiber.spin { sleep 10; fiber.stop }

Parameters:

  • tag (any) (defaults to: nil)

    child fiber's tag

  • orig_caller (Array<String>) (defaults to: Kernel.caller)

    caller to set for fiber

Returns:

  • (Fiber)

    child fiber



266
267
268
269
270
271
272
# File 'lib/polyphony/extensions/fiber.rb', line 266

def spin(tag = nil, orig_caller = Kernel.caller, &block)
  f = Fiber.new { |v| f.run(v) }
  f.prepare(tag, block, orig_caller, self)
  (@children ||= {})[f] = true
  f.monitor(self) if @supervise_mode
  f
end

#stateSymbol

Returns the current state for the fiber, one of the following:

  • :running - the fiber is currently running.
  • :runnable - the fiber is on the runqueue, scheduled to be resumed ("ran").
  • :waiting - the fiber is waiting on some blocking operation to complete, allowing other fibers to run.
  • :dead - the fiber has finished running.

Returns:

  • (Symbol)


106
107
108
109
110
111
112
113
# File 'ext/polyphony/fiber.c', line 106

static VALUE Fiber_state(VALUE self) {
  if (!rb_fiber_alive_p(self) || (rb_ivar_get(self, ID_ivar_running) == Qfalse))
    return SYM_dead;
  if (rb_fiber_current() == self) return SYM_running;
  if (rb_ivar_get(self, ID_ivar_runnable) != Qnil) return SYM_runnable;

  return SYM_waiting;
}

#supervise(*fibers, **opts, &block) ⇒ Object

Supervises the given fibers or all child fibers. The fiber is put in supervision mode, which means any child added after calling #supervise will automatically be supervised. Depending on the given options, fibers may be automatically restarted.

If a block is given, the block is called whenever a supervised fiber has terminated. If the :on_done option is given, that proc will be called when a supervised fiber has terminated. If the :on_error option is given, that proc will be called when a supervised fiber has terminated with an uncaught exception. If the :restart option equals :always, fibers will always be restarted. If the :restart option equals :on_error, fibers will be restarted only when terminated with an uncaught exception.

This method blocks indefinitely.

Parameters:

  • fibers (Array<Fiber>)

    fibers to supervise

  • opts (Hash)

    a customizable set of options

Options Hash (**opts):

  • :on_done (Proc, nil)

    proc to call when a supervised fiber is terminated

  • :on_error (Proc, nil)

    proc to call when a supervised fiber is terminated with an exception

  • :restart (:always, :on_error, nil)

    whether to restart terminated fibers



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/polyphony/extensions/fiber.rb', line 228

def supervise(*fibers, **opts, &block)
  block ||= supervise_opts_to_block(opts)

  @supervise_mode = true
  fibers = children if fibers.empty?
  fibers.each do |f|
    f.attach_to(self) unless f.parent == self
    f.monitor(self)
  end

  mailbox = monitor_mailbox

  while true
    (fiber, result) = mailbox.shift
    block&.call(fiber, result)
  end
ensure
  @supervise_mode = false
end

#terminate(graceful: false) ⇒ Fiber

Terminates the fiber, optionally setting the graceful shutdown flag.

Parameters:

  • graceful (bool) (defaults to: false)

    Whether to perform a graceful shutdown

Returns:



146
147
148
149
150
151
152
# File 'lib/polyphony/extensions/fiber.rb', line 146

def terminate(graceful: false)
  return if @running == false

  @graceful_shutdown = graceful
  schedule Polyphony::Terminate.new
  self
end

#terminate_all_children(graceful: false) ⇒ Fiber

Terminates all child fibers. This method will return before the fibers are actually terminated.

Parameters:

  • graceful (bool) (defaults to: false)

    whether to perform a graceful termination

Returns:



279
280
281
282
283
284
285
286
287
288
# File 'lib/polyphony/extensions/fiber.rb', line 279

def terminate_all_children(graceful: false)
  return self unless @children

  e = Polyphony::Terminate.new
  @children.each_key do |c|
    c.graceful_shutdown = true if graceful
    c.raise e
  end
  self
end

#unmonitor(fiber) ⇒ Fiber

Removes a monitor fiber.

Parameters:

  • fiber (Fiber)

    monitoring fiber

Returns:



523
524
525
526
# File 'lib/polyphony/extensions/fiber.rb', line 523

def unmonitor(fiber)
  (@monitors ||= []).delete(fiber)
  self
end

#awaitany #joinany #valueany

Waits for the fiber to terminate, and returns its return value (the result of its last statement). If the fiber has terminated with an ancaught exception, the exception will be raised.

f = spin { :foo; :bar }
f.await #=> :bar

Overloads:

  • #awaitany

    Returns fiber's return value.

    Returns:

    • (any)

      fiber's return value

  • #joinany

    Returns fiber's return value.

    Returns:

    • (any)

      fiber's return value

  • #valueany

    Returns fiber's return value.

    Returns:

    • (any)

      fiber's return value



202
203
204
# File 'lib/polyphony/extensions/fiber.rb', line 202

def await
  Fiber.await(self).first
end