Module: Concurrent
- Extended by:
- Logging
- Defined in:
- lib/concurrent.rb,
lib/concurrent/ivar.rb,
lib/concurrent/mvar.rb,
lib/concurrent/tvar.rb,
lib/concurrent/actor.rb,
lib/concurrent/agent.rb,
lib/concurrent/async.rb,
lib/concurrent/delay.rb,
lib/extension_helper.rb,
lib/concurrent/errors.rb,
lib/concurrent/future.rb,
lib/concurrent/logging.rb,
lib/concurrent/promise.rb,
lib/concurrent/version.rb,
lib/concurrent/dataflow.rb,
lib/concurrent/exchanger.rb,
lib/concurrent/actor/core.rb,
lib/concurrent/actor/root.rb,
lib/concurrent/obligation.rb,
lib/concurrent/observable.rb,
lib/concurrent/timer_task.rb,
lib/concurrent/actor/utils.rb,
lib/concurrent/actor/errors.rb,
lib/concurrent/atomic/event.rb,
lib/concurrent/actor/context.rb,
lib/concurrent/configuration.rb,
lib/concurrent/lazy_register.rb,
lib/concurrent/utility/timer.rb,
lib/concurrent/actor/envelope.rb,
lib/concurrent/options_parser.rb,
lib/concurrent/scheduled_task.rb,
lib/concurrent/actor/behaviour.rb,
lib/concurrent/actor/reference.rb,
lib/concurrent/channel/channel.rb,
lib/concurrent/dereferenceable.rb,
lib/concurrent/utility/timeout.rb,
lib/concurrent/actor/type_check.rb,
lib/concurrent/actor/utils/pool.rb,
lib/concurrent/atomic/condition.rb,
lib/concurrent/executor/executor.rb,
lib/concurrent/actor/utils/ad_hoc.rb,
lib/concurrent/executor/timer_set.rb,
lib/concurrent/actor/utils/balancer.rb,
lib/concurrent/atomic/atomic_fixnum.rb,
lib/concurrent/atomic_reference/rbx.rb,
lib/concurrent/actor/utils/broadcast.rb,
lib/concurrent/atomic/atomic_boolean.rb,
lib/concurrent/atomic/cyclic_barrier.rb,
lib/concurrent/atomic_reference/ruby.rb,
lib/concurrent/channel/waitable_list.rb,
lib/concurrent/actor/behaviour/awaits.rb,
lib/concurrent/actor/behaviour/buffer.rb,
lib/concurrent/atomic/synchronization.rb,
lib/concurrent/atomic_reference/jruby.rb,
lib/concurrent/collection/ring_buffer.rb,
lib/concurrent/actor/behaviour/linking.rb,
lib/concurrent/actor/behaviour/pausing.rb,
lib/concurrent/atomic/count_down_latch.rb,
lib/concurrent/atomic/thread_local_var.rb,
lib/concurrent/utility/processor_count.rb,
lib/concurrent/actor/behaviour/abstract.rb,
lib/concurrent/actor/public_delegations.rb,
lib/concurrent/channel/buffered_channel.rb,
lib/concurrent/collection/priority_queue.rb,
lib/concurrent/actor/behaviour/supervised.rb,
lib/concurrent/actor/internal_delegations.rb,
lib/concurrent/channel/unbuffered_channel.rb,
lib/concurrent/executor/fixed_thread_pool.rb,
lib/concurrent/actor/behaviour/supervising.rb,
lib/concurrent/actor/behaviour/termination.rb,
lib/concurrent/executor/cached_thread_pool.rb,
lib/concurrent/executor/immediate_executor.rb,
lib/concurrent/executor/safe_task_executor.rb,
lib/concurrent/actor/behaviour/sets_results.rb,
lib/concurrent/executor/per_thread_executor.rb,
lib/concurrent/actor/behaviour/removes_child.rb,
lib/concurrent/atomic_reference/mutex_atomic.rb,
lib/concurrent/executor/serialized_execution.rb,
lib/concurrent/executor/thread_pool_executor.rb,
lib/concurrent/atomic_reference/direct_update.rb,
lib/concurrent/collection/blocking_ring_buffer.rb,
lib/concurrent/executor/java_fixed_thread_pool.rb,
lib/concurrent/executor/ruby_fixed_thread_pool.rb,
lib/concurrent/executor/single_thread_executor.rb,
lib/concurrent/actor/behaviour/executes_context.rb,
lib/concurrent/executor/java_cached_thread_pool.rb,
lib/concurrent/executor/ruby_cached_thread_pool.rb,
lib/concurrent/executor/ruby_thread_pool_worker.rb,
lib/concurrent/actor/default_dead_letter_handler.rb,
lib/concurrent/atomic/copy_on_write_observer_set.rb,
lib/concurrent/atomic/copy_on_notify_observer_set.rb,
lib/concurrent/executor/java_thread_pool_executor.rb,
lib/concurrent/executor/ruby_thread_pool_executor.rb,
lib/concurrent/actor/behaviour/terminates_children.rb,
lib/concurrent/atomic_reference/numeric_cas_wrapper.rb,
lib/concurrent/executor/indirect_immediate_executor.rb,
lib/concurrent/executor/java_single_thread_executor.rb,
lib/concurrent/executor/ruby_single_thread_executor.rb,
lib/concurrent/atomic_reference/concurrent_update_error.rb,
lib/concurrent/actor/behaviour/errors_on_unknown_message.rb
Overview
Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell, F#, C#, Java, and classic concurrency patterns.
The design goals of this gem are:
-
Stay true to the spirit of the languages providing inspiration
-
But implement in a way that makes sense for Ruby
-
Keep the semantics as idiomatic Ruby as possible
-
Support features that make sense in Ruby
-
Exclude features that don’t make sense in Ruby
-
Be small, lean, and loosely coupled
Defined Under Namespace
Modules: Actor, Async, AtomicDirectUpdate, AtomicNumericCompareAndSetWrapper, Channel, Dereferenceable, Executor, JavaExecutor, Logging, Obligation, Observable, OptionsParser, RubyExecutor, SerialExecutor, Synchronization Classes: AbstractThreadLocalVar, Agent, Atomic, AtomicBoolean, AtomicFixnum, BlockingRingBuffer, BufferedChannel, CAtomic, CAtomicBoolean, CAtomicFixnum, CachedThreadPool, ConcurrentUpdateError, Condition, Configuration, CopyOnNotifyObserverSet, CopyOnWriteObserverSet, CountDownLatch, CyclicBarrier, Delay, Event, Exchanger, FixedThreadPool, Future, IVar, ImmediateExecutor, IndirectImmediateExecutor, JavaAtomic, JavaAtomicBoolean, JavaAtomicFixnum, JavaCachedThreadPool, JavaCountDownLatch, JavaFixedThreadPool, JavaPriorityQueue, JavaSingleThreadExecutor, JavaThreadPoolExecutor, LazyRegister, MVar, MutexAtomic, MutexAtomicBoolean, MutexAtomicFixnum, MutexCountDownLatch, MutexPriorityQueue, PerThreadExecutor, PriorityQueue, ProcessorCounter, Promise, RbxAtomic, RingBuffer, RubyCachedThreadPool, RubyFixedThreadPool, RubySingleThreadExecutor, RubyThreadPoolExecutor, SafeTaskExecutor, ScheduledTask, SerializedExecution, SerializedExecutionDelegator, SingleThreadExecutor, TVar, ThreadLocalVar, ThreadPoolExecutor, TimerSet, TimerTask, Transaction, UnbufferedChannel, WaitableList
Constant Summary collapse
- ConfigurationError =
Raised when errors occur during configuration.
Class.new(StandardError)
- LifecycleError =
Raised when a lifecycle method (such as ‘stop`) is called in an improper sequence or when the object is in an inappropriate state.
Class.new(StandardError)
- InitializationError =
Raised when an object’s methods are called when it has not been properly initialized.
Class.new(StandardError)
- MaxRestartFrequencyError =
Raised when an object with a start/stop lifecycle has been started an excessive number of times. Often used in conjunction with a restart policy or strategy.
Class.new(StandardError)
- MultipleAssignmentError =
Raised when an attempt is made to modify an immutable object (such as an ‘IVar`) after its final state has been set.
Class.new(StandardError)
- RejectedExecutionError =
Raised by an ‘Executor` when it is unable to process a given task, possibly because of a reject policy or other internal error.
Class.new(StandardError)
- TimeoutError =
Raised when an operation times out.
Class.new(StandardError)
- Actress =
Concurrent::Actor
- VERSION =
'0.7.1'
Class Method Summary collapse
-
.abort_transaction ⇒ Object
Abort a currently running transaction - see ‘Concurrent::atomically`.
-
.atomically ⇒ Object
Run a block that reads and writes ‘TVar`s as a single atomic transaction.
- .call_dataflow(method, executor, *inputs, &block) ⇒ Object
- .configuration ⇒ Configuration
-
.configure {|the| ... } ⇒ Object
Perform gem-level configuration.
-
.dataflow(*inputs) {|inputs| ... } ⇒ Object
- .dataflow!(*inputs, &block) ⇒ Object
- .dataflow_with(executor, *inputs, &block) ⇒ Object
- .dataflow_with!(executor, *inputs, &block) ⇒ Object
- .finalize_global_executors ⇒ Object
- .physical_processor_count ⇒ Object
- .processor_count ⇒ Object
-
.timeout(seconds) ⇒ Object
Wait the given number of seconds for the block operation to complete.
-
.timer(seconds, *args) { ... } ⇒ Boolean
Perform the given operation asynchronously after the given number of seconds.
Instance Method Summary collapse
-
#compare_and_set ⇒ Boolean
Atomically sets the value to the given updated value if the current value == the expected value.
-
#decrement ⇒ Fixnum
Decreases the current value by 1.
-
#false? ⇒ Boolean
Is the current value ‘false`.
-
#increment ⇒ Fixnum
Increases the current value by 1.
-
#initialize ⇒ Object
Creates a new ‘AtomicBoolean` with the given initial value.
-
#make_false ⇒ Boolean
Explicitly sets the value to false.
-
#make_true ⇒ Boolean
Explicitly sets the value to true.
-
#true? ⇒ Boolean
Is the current value ‘true`.
-
#value ⇒ Boolean
Retrieves the current ‘Boolean` value.
-
#value= ⇒ Boolean
Explicitly sets the value.
Methods included from Logging
Class Method Details
.abort_transaction ⇒ Object
Abort a currently running transaction - see ‘Concurrent::atomically`.
140 141 142 |
# File 'lib/concurrent/tvar.rb', line 140 def abort_transaction raise Transaction::AbortError.new end |
.atomically ⇒ Object
Run a block that reads and writes ‘TVar`s as a single atomic transaction. With respect to the value of `TVar` objects, the transaction is atomic, in that it either happens or it does not, consistent, in that the `TVar` objects involved will never enter an illegal state, and isolated, in that transactions never interfere with each other. You may recognise these properties from database transactions.
There are some very important and unusual semantics that you must be aware of:
-
Most importantly, the block that you pass to atomically may be executed more than once. In most cases your code should be free of side-effects, except for via TVar.
-
If an exception escapes an atomically block it will abort the transaction.
-
It is undefined behaviour to use callcc or Fiber with atomically.
-
If you create a new thread within an atomically, it will not be part of the transaction. Creating a thread counts as a side-effect.
Transactions within transactions are flattened to a single transaction.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/concurrent/tvar.rb', line 86 def atomically raise ArgumentError.new('no block given') unless block_given? # Get the current transaction transaction = Transaction::current # Are we not already in a transaction (not nested)? if transaction.nil? # New transaction begin # Retry loop loop do # Create a new transaction transaction = Transaction.new Transaction::current = transaction # Run the block, aborting on exceptions begin result = yield rescue Transaction::AbortError => e transaction.abort result = Transaction::ABORTED rescue => e transaction.abort raise e end # If we can commit, break out of the loop if result != Transaction::ABORTED if transaction.commit break result end end end ensure # Clear the current transaction Transaction::current = nil end else # Nested transaction - flatten it and just run the block yield end end |
.call_dataflow(method, executor, *inputs, &block) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/concurrent/dataflow.rb', line 56 def call_dataflow(method, executor, *inputs, &block) raise ArgumentError.new('an executor must be provided') if executor.nil? raise ArgumentError.new('no block given') unless block_given? raise ArgumentError.new('not all dependencies are IVars') unless inputs.all? { |input| input.is_a? IVar } result = Future.new(executor: executor) do values = inputs.map { |input| input.send(method) } block.call(*values) end if inputs.empty? result.execute else counter = DependencyCounter.new(inputs.size) { result.execute } inputs.each do |input| input.add_observer counter end end result end |
.configuration ⇒ Configuration
124 125 126 |
# File 'lib/concurrent/configuration.rb', line 124 def self.configuration @configuration.value end |
.configure {|the| ... } ⇒ Object
Perform gem-level configuration.
132 133 134 |
# File 'lib/concurrent/configuration.rb', line 132 def self.configure yield(configuration) end |
.dataflow(*inputs) {|inputs| ... } ⇒ Object
34 35 36 |
# File 'lib/concurrent/dataflow.rb', line 34 def dataflow(*inputs, &block) dataflow_with(Concurrent.configuration.global_operation_pool, *inputs, &block) end |
.dataflow!(*inputs, &block) ⇒ Object
44 45 46 |
# File 'lib/concurrent/dataflow.rb', line 44 def dataflow!(*inputs, &block) dataflow_with!(Concurrent.configuration.global_task_pool, *inputs, &block) end |
.dataflow_with(executor, *inputs, &block) ⇒ Object
39 40 41 |
# File 'lib/concurrent/dataflow.rb', line 39 def dataflow_with(executor, *inputs, &block) call_dataflow(:value, executor, *inputs, &block) end |
.dataflow_with!(executor, *inputs, &block) ⇒ Object
49 50 51 |
# File 'lib/concurrent/dataflow.rb', line 49 def dataflow_with!(executor, *inputs, &block) call_dataflow(:value!, executor, *inputs, &block) end |
.finalize_global_executors ⇒ Object
136 137 138 139 140 |
# File 'lib/concurrent/configuration.rb', line 136 def self.finalize_global_executors self.finalize_executor(self.configuration.global_timer_set) self.finalize_executor(self.configuration.global_task_pool) self.finalize_executor(self.configuration.global_operation_pool) end |
.physical_processor_count ⇒ Object
148 149 150 |
# File 'lib/concurrent/utility/processor_count.rb', line 148 def self.physical_processor_count processor_counter.physical_processor_count end |
.processor_count ⇒ Object
144 145 146 |
# File 'lib/concurrent/utility/processor_count.rb', line 144 def self.processor_count processor_counter.processor_count end |
.timeout(seconds) ⇒ Object
This method is intended to be a simpler and more reliable replacement
Wait the given number of seconds for the block operation to complete.
to the Ruby standard library ‘Timeout::timeout` method.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/concurrent/utility/timeout.rb', line 19 def timeout(seconds) thread = Thread.new do Thread.current[:result] = yield end success = thread.join(seconds) if success return thread[:result] else raise TimeoutError end ensure Thread.kill(thread) unless thread.nil? end |
.timer(seconds, *args) { ... } ⇒ Boolean
Perform the given operation asynchronously after the given number of seconds.
13 14 15 16 17 18 19 |
# File 'lib/concurrent/utility/timer.rb', line 13 def timer(seconds, *args, &block) raise ArgumentError.new('no block given') unless block_given? raise ArgumentError.new('interval must be greater than or equal to zero') if seconds < 0 Concurrent.configuration.global_timer_set.post(seconds, *args, &block) true end |
Instance Method Details
#compare_and_set ⇒ Boolean
Atomically sets the value to the given updated value if the current value == the expected value.
|
# File 'lib/concurrent/atomic/atomic_fixnum.rb', line 189
|
#decrement ⇒ Fixnum
Decreases the current value by 1.
|
# File 'lib/concurrent/atomic/atomic_fixnum.rb', line 186
|
#false? ⇒ Boolean
Is the current value ‘false`
|
# File 'lib/concurrent/atomic/atomic_boolean.rb', line 182
|
#increment ⇒ Fixnum
Increases the current value by 1.
|
# File 'lib/concurrent/atomic/atomic_fixnum.rb', line 183
|
#initialize ⇒ Object
Creates a new ‘AtomicBoolean` with the given initial value.
|
# File 'lib/concurrent/atomic/atomic_fixnum.rb', line 174
|
#make_false ⇒ Boolean
Explicitly sets the value to false.
|
# File 'lib/concurrent/atomic/atomic_boolean.rb', line 188
|
#make_true ⇒ Boolean
Explicitly sets the value to true.
|
# File 'lib/concurrent/atomic/atomic_boolean.rb', line 185
|
#true? ⇒ Boolean
Is the current value ‘true`
|
# File 'lib/concurrent/atomic/atomic_boolean.rb', line 179
|
#value ⇒ Boolean
Retrieves the current ‘Boolean` value.
|
# File 'lib/concurrent/atomic/atomic_fixnum.rb', line 177
|
#value= ⇒ Boolean
Explicitly sets the value.
|
# File 'lib/concurrent/atomic/atomic_fixnum.rb', line 180
|