Module: Concurrent

Defined in:
lib/concurrent.rb,
lib/concurrent/ivar.rb,
lib/concurrent/mvar.rb,
lib/concurrent/actor.rb,
lib/concurrent/agent.rb,
lib/concurrent/event.rb,
lib/concurrent/atomic.rb,
lib/concurrent/future.rb,
lib/concurrent/channel.rb,
lib/concurrent/promise.rb,
lib/concurrent/version.rb,
lib/concurrent/dataflow.rb,
lib/concurrent/postable.rb,
lib/concurrent/runnable.rb,
lib/concurrent/condition.rb,
lib/concurrent/stoppable.rb,
lib/concurrent/utilities.rb,
lib/concurrent/obligation.rb,
lib/concurrent/supervisor.rb,
lib/concurrent/timer_task.rb,
lib/concurrent/scheduled_task.rb,
lib/concurrent/dereferenceable.rb,
lib/concurrent/count_down_latch.rb,
lib/concurrent/thread_local_var.rb,
lib/concurrent/fixed_thread_pool.rb,
lib/concurrent/cached_thread_pool.rb,
lib/concurrent/global_thread_pool.rb,
lib/concurrent/immediate_executor.rb,
lib/concurrent/safe_task_executor.rb,
lib/concurrent/fixed_thread_pool/worker.rb,
lib/concurrent/cached_thread_pool/worker.rb,
lib/concurrent/copy_on_write_observer_set.rb,
lib/concurrent/copy_on_notify_observer_set.rb

Overview

Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell, F#, C#, Java, and classic concurrency patterns.

The design goals of this gem are:

  • Stay true to the spirit of the languages providing inspiration

  • But implement in a way that makes sense for Ruby

  • Keep the semantics as idiomatic Ruby as possible

  • Support features that make sense in Ruby

  • Exclude features that don’t make sense in Ruby

  • Be small, lean, and loosely coupled

Defined Under Namespace

Modules: Dereferenceable, Obligation, Postable, Runnable, Stoppable, ThreadLocalJavaStorage, ThreadLocalNewStorage, ThreadLocalOldStorage, ThreadLocalSymbolAllocator, UsesGlobalThreadPool Classes: Actor, Agent, AtomicFixnum, CachedThreadPool, Channel, Condition, CopyOnNotifyObserverSet, CopyOnWriteObserverSet, CountDownLatch, Event, FixedThreadPool, Future, IVar, ImmediateExecutor, MVar, NullThreadPool, Promise, SafeTaskExecutor, ScheduledTask, Supervisor, ThreadLocalVar, TimerTask

Constant Summary collapse

MultipleAssignmentError =
Class.new(StandardError)
VERSION =
'0.5.0'
TimeoutError =

Error raised when an operations times out.

Class.new(StandardError)

Class Method Summary collapse

Class Method Details

.dataflow(*inputs) {|inputs| ... } ⇒ Object

Dataflow allows you to create a task that will be scheduled then all of its data dependencies are available. Data dependencies are Future values. The dataflow task itself is also a Future value, so you can build up a graph of these tasks, each of which is run when all the data and other tasks it depends on are available or completed.

Our syntax is somewhat related to that of Akka’s flow and Habanero Java’s DataDrivenFuture. However unlike Akka we don’t schedule a task at all until it is ready to run, and unlike Habanero Java we pass the data values into the task instead of dereferencing them again in the task.

The theory of dataflow goes back to the 80s. In the terminology of the literature, our implementation is coarse-grained, in that each task can be many instructions, and dynamic in that you can create more tasks within other tasks.

Examples:

Parallel Fibonacci calculator

def fib(n)
  if n < 2
    Concurrent::dataflow { n }
  else
    n1 = fib(n - 1)
    n2 = fib(n - 2)
    Concurrent::dataflow(n1, n2) { |v1, v2| v1 + v2 }
  end
end

f = fib(14) #=> #<Concurrent::Future:0x000001019a26d8 ...

# wait up to 1 second for the answer...
f.value(1) #=> 377

Parameters:

  • inputs (Future)

    zero or more Future operations that this dataflow depends upon

Yields:

  • The operation to perform once all the dependencies are met

Yield Parameters:

  • inputs (Future)

    each of the Future inputs to the dataflow

Yield Returns:

  • (Object)

    the result of the block operation

Returns:

  • (Object)

    the result of all the operations

Raises:

  • (ArgumentError)

    if no block is given

  • (ArgumentError)

    if any of the inputs are not IVars



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/concurrent/dataflow.rb', line 62

def dataflow(*inputs, &block)
  raise ArgumentError.new('no block given') unless block_given?
  raise ArgumentError.new('not all dependencies are IVars') unless inputs.all? { |input| input.is_a? IVar }

  result = Future.new do
    values = inputs.map { |input| input.value }
    block.call(*values)
  end

  if inputs.empty?
    result.execute
  else
    counter = DependencyCounter.new(inputs.size) { result.execute }

    inputs.each do |input|
      input.add_observer counter
    end
  end

  result
end

.timeout(seconds) ⇒ Object

Note:

This method is intended to be a simpler and more reliable replacement

Wait the given number of seconds for the block operation to complete.

to the Ruby standard library Timeout::timeout method.

Parameters:

  • seconds (Integer)

    The number of seconds to wait

Returns:

  • The result of the block operation

Raises:

  • Concurrent::TimeoutError when the block operation does not complete in the allotted number of seconds.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/concurrent/utilities.rb', line 19

def timeout(seconds)

  thread = Thread.new do
    Thread.current[:result] = yield
  end
  success = thread.join(seconds)

  if success
    return thread[:result]
  else
    raise TimeoutError
  end
ensure
  Thread.kill(thread) unless thread.nil?
end