Module: Concurrent
- Defined in:
- lib/concurrent.rb,
lib/concurrent/ivar.rb,
lib/concurrent/mvar.rb,
lib/concurrent/actor.rb,
lib/concurrent/agent.rb,
lib/concurrent/event.rb,
lib/concurrent/atomic.rb,
lib/concurrent/future.rb,
lib/concurrent/channel.rb,
lib/concurrent/promise.rb,
lib/concurrent/version.rb,
lib/concurrent/dataflow.rb,
lib/concurrent/postable.rb,
lib/concurrent/runnable.rb,
lib/concurrent/condition.rb,
lib/concurrent/stoppable.rb,
lib/concurrent/utilities.rb,
lib/concurrent/obligation.rb,
lib/concurrent/supervisor.rb,
lib/concurrent/timer_task.rb,
lib/concurrent/scheduled_task.rb,
lib/concurrent/dereferenceable.rb,
lib/concurrent/count_down_latch.rb,
lib/concurrent/thread_local_var.rb,
lib/concurrent/fixed_thread_pool.rb,
lib/concurrent/cached_thread_pool.rb,
lib/concurrent/global_thread_pool.rb,
lib/concurrent/immediate_executor.rb,
lib/concurrent/safe_task_executor.rb,
lib/concurrent/fixed_thread_pool/worker.rb,
lib/concurrent/cached_thread_pool/worker.rb,
lib/concurrent/copy_on_write_observer_set.rb,
lib/concurrent/copy_on_notify_observer_set.rb
Overview
Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell, F#, C#, Java, and classic concurrency patterns.
The design goals of this gem are:
-
Stay true to the spirit of the languages providing inspiration
-
But implement in a way that makes sense for Ruby
-
Keep the semantics as idiomatic Ruby as possible
-
Support features that make sense in Ruby
-
Exclude features that don’t make sense in Ruby
-
Be small, lean, and loosely coupled
Defined Under Namespace
Modules: Dereferenceable, Obligation, Postable, Runnable, Stoppable, ThreadLocalJavaStorage, ThreadLocalNewStorage, ThreadLocalOldStorage, ThreadLocalSymbolAllocator, UsesGlobalThreadPool Classes: Actor, Agent, AtomicFixnum, CachedThreadPool, Channel, Condition, CopyOnNotifyObserverSet, CopyOnWriteObserverSet, CountDownLatch, Event, FixedThreadPool, Future, IVar, ImmediateExecutor, MVar, NullThreadPool, Promise, SafeTaskExecutor, ScheduledTask, Supervisor, ThreadLocalVar, TimerTask
Constant Summary collapse
- MultipleAssignmentError =
Class.new(StandardError)
- VERSION =
'0.5.0'- TimeoutError =
Error raised when an operations times out.
Class.new(StandardError)
Class Method Summary collapse
-
.dataflow(*inputs) {|inputs| ... } ⇒ Object
Dataflow allows you to create a task that will be scheduled then all of its data dependencies are available.
-
.timeout(seconds) ⇒ Object
Wait the given number of seconds for the block operation to complete.
Class Method Details
.dataflow(*inputs) {|inputs| ... } ⇒ Object
Dataflow allows you to create a task that will be scheduled then all of its data dependencies are available. Data dependencies are Future values. The dataflow task itself is also a Future value, so you can build up a graph of these tasks, each of which is run when all the data and other tasks it depends on are available or completed.
Our syntax is somewhat related to that of Akka’s flow and Habanero Java’s DataDrivenFuture. However unlike Akka we don’t schedule a task at all until it is ready to run, and unlike Habanero Java we pass the data values into the task instead of dereferencing them again in the task.
The theory of dataflow goes back to the 80s. In the terminology of the literature, our implementation is coarse-grained, in that each task can be many instructions, and dynamic in that you can create more tasks within other tasks.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/concurrent/dataflow.rb', line 62 def dataflow(*inputs, &block) raise ArgumentError.new('no block given') unless block_given? raise ArgumentError.new('not all dependencies are IVars') unless inputs.all? { |input| input.is_a? IVar } result = Future.new do values = inputs.map { |input| input.value } block.call(*values) end if inputs.empty? result.execute else counter = DependencyCounter.new(inputs.size) { result.execute } inputs.each do |input| input.add_observer counter end end result end |
.timeout(seconds) ⇒ Object
This method is intended to be a simpler and more reliable replacement
Wait the given number of seconds for the block operation to complete.
to the Ruby standard library Timeout::timeout method.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/concurrent/utilities.rb', line 19 def timeout(seconds) thread = Thread.new do Thread.current[:result] = yield end success = thread.join(seconds) if success return thread[:result] else raise TimeoutError end ensure Thread.kill(thread) unless thread.nil? end |