Module: Concurrent

Extended by:
Logging
Defined in:
lib/concurrent.rb,
lib/concurrent/ivar.rb,
lib/concurrent/mvar.rb,
lib/concurrent/tvar.rb,
lib/concurrent/actor.rb,
lib/concurrent/agent.rb,
lib/concurrent/async.rb,
lib/concurrent/delay.rb,
lib/extension_helper.rb,
lib/concurrent/errors.rb,
lib/concurrent/future.rb,
lib/concurrent/logging.rb,
lib/concurrent/promise.rb,
lib/concurrent/version.rb,
lib/concurrent/dataflow.rb,
lib/concurrent/exchanger.rb,
lib/concurrent/actor/core.rb,
lib/concurrent/actor/root.rb,
lib/concurrent/obligation.rb,
lib/concurrent/observable.rb,
lib/concurrent/timer_task.rb,
lib/concurrent/actor/utils.rb,
lib/concurrent/actor/errors.rb,
lib/concurrent/atomic/event.rb,
lib/concurrent/actor/context.rb,
lib/concurrent/configuration.rb,
lib/concurrent/lazy_register.rb,
lib/concurrent/utility/timer.rb,
lib/concurrent/actor/envelope.rb,
lib/concurrent/options_parser.rb,
lib/concurrent/scheduled_task.rb,
lib/concurrent/actor/behaviour.rb,
lib/concurrent/actor/reference.rb,
lib/concurrent/channel/channel.rb,
lib/concurrent/dereferenceable.rb,
lib/concurrent/utility/timeout.rb,
lib/concurrent/actor/type_check.rb,
lib/concurrent/actor/utils/pool.rb,
lib/concurrent/atomic/condition.rb,
lib/concurrent/executor/executor.rb,
lib/concurrent/actor/utils/ad_hoc.rb,
lib/concurrent/executor/timer_set.rb,
lib/concurrent/actor/utils/balancer.rb,
lib/concurrent/atomic/atomic_fixnum.rb,
lib/concurrent/atomic_reference/rbx.rb,
lib/concurrent/actor/utils/broadcast.rb,
lib/concurrent/atomic/atomic_boolean.rb,
lib/concurrent/atomic/cyclic_barrier.rb,
lib/concurrent/atomic_reference/ruby.rb,
lib/concurrent/channel/waitable_list.rb,
lib/concurrent/actor/behaviour/awaits.rb,
lib/concurrent/actor/behaviour/buffer.rb,
lib/concurrent/atomic/synchronization.rb,
lib/concurrent/atomic_reference/jruby.rb,
lib/concurrent/collection/ring_buffer.rb,
lib/concurrent/actor/behaviour/linking.rb,
lib/concurrent/actor/behaviour/pausing.rb,
lib/concurrent/atomic/count_down_latch.rb,
lib/concurrent/atomic/thread_local_var.rb,
lib/concurrent/utility/processor_count.rb,
lib/concurrent/actor/behaviour/abstract.rb,
lib/concurrent/actor/public_delegations.rb,
lib/concurrent/channel/buffered_channel.rb,
lib/concurrent/collection/priority_queue.rb,
lib/concurrent/actor/behaviour/supervised.rb,
lib/concurrent/actor/internal_delegations.rb,
lib/concurrent/channel/unbuffered_channel.rb,
lib/concurrent/executor/fixed_thread_pool.rb,
lib/concurrent/actor/behaviour/supervising.rb,
lib/concurrent/actor/behaviour/termination.rb,
lib/concurrent/executor/cached_thread_pool.rb,
lib/concurrent/executor/immediate_executor.rb,
lib/concurrent/executor/safe_task_executor.rb,
lib/concurrent/actor/behaviour/sets_results.rb,
lib/concurrent/executor/per_thread_executor.rb,
lib/concurrent/actor/behaviour/removes_child.rb,
lib/concurrent/atomic_reference/mutex_atomic.rb,
lib/concurrent/executor/serialized_execution.rb,
lib/concurrent/executor/thread_pool_executor.rb,
lib/concurrent/atomic_reference/direct_update.rb,
lib/concurrent/collection/blocking_ring_buffer.rb,
lib/concurrent/executor/java_fixed_thread_pool.rb,
lib/concurrent/executor/ruby_fixed_thread_pool.rb,
lib/concurrent/executor/single_thread_executor.rb,
lib/concurrent/actor/behaviour/executes_context.rb,
lib/concurrent/executor/java_cached_thread_pool.rb,
lib/concurrent/executor/ruby_cached_thread_pool.rb,
lib/concurrent/executor/ruby_thread_pool_worker.rb,
lib/concurrent/actor/default_dead_letter_handler.rb,
lib/concurrent/atomic/copy_on_write_observer_set.rb,
lib/concurrent/atomic/copy_on_notify_observer_set.rb,
lib/concurrent/executor/java_thread_pool_executor.rb,
lib/concurrent/executor/ruby_thread_pool_executor.rb,
lib/concurrent/actor/behaviour/terminates_children.rb,
lib/concurrent/atomic_reference/numeric_cas_wrapper.rb,
lib/concurrent/executor/indirect_immediate_executor.rb,
lib/concurrent/executor/java_single_thread_executor.rb,
lib/concurrent/executor/ruby_single_thread_executor.rb,
lib/concurrent/atomic_reference/concurrent_update_error.rb,
lib/concurrent/actor/behaviour/errors_on_unknown_message.rb

Overview

Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell, F#, C#, Java, and classic concurrency patterns.

The design goals of this gem are:

  • Stay true to the spirit of the languages providing inspiration

  • But implement in a way that makes sense for Ruby

  • Keep the semantics as idiomatic Ruby as possible

  • Support features that make sense in Ruby

  • Exclude features that don’t make sense in Ruby

  • Be small, lean, and loosely coupled

Defined Under Namespace

Modules: Actor, Async, AtomicDirectUpdate, AtomicNumericCompareAndSetWrapper, Channel, Dereferenceable, Executor, JavaExecutor, Logging, Obligation, Observable, OptionsParser, RubyExecutor, SerialExecutor, Synchronization Classes: AbstractThreadLocalVar, Agent, Atomic, AtomicBoolean, AtomicFixnum, BlockingRingBuffer, BufferedChannel, CAtomic, CAtomicBoolean, CAtomicFixnum, CachedThreadPool, ConcurrentUpdateError, Condition, Configuration, CopyOnNotifyObserverSet, CopyOnWriteObserverSet, CountDownLatch, CyclicBarrier, Delay, Event, Exchanger, FixedThreadPool, Future, IVar, ImmediateExecutor, IndirectImmediateExecutor, JavaAtomic, JavaAtomicBoolean, JavaAtomicFixnum, JavaCachedThreadPool, JavaCountDownLatch, JavaFixedThreadPool, JavaPriorityQueue, JavaSingleThreadExecutor, JavaThreadPoolExecutor, LazyRegister, MVar, MutexAtomic, MutexAtomicBoolean, MutexAtomicFixnum, MutexCountDownLatch, MutexPriorityQueue, PerThreadExecutor, PriorityQueue, ProcessorCounter, Promise, RbxAtomic, RingBuffer, RubyCachedThreadPool, RubyFixedThreadPool, RubySingleThreadExecutor, RubyThreadPoolExecutor, SafeTaskExecutor, ScheduledTask, SerializedExecution, SerializedExecutionDelegator, SingleThreadExecutor, TVar, ThreadLocalVar, ThreadPoolExecutor, TimerSet, TimerTask, Transaction, UnbufferedChannel, WaitableList

Constant Summary collapse

ConfigurationError =

Raised when errors occur during configuration.

Class.new(StandardError)
LifecycleError =

Raised when a lifecycle method (such as ‘stop`) is called in an improper sequence or when the object is in an inappropriate state.

Class.new(StandardError)
InitializationError =

Raised when an object’s methods are called when it has not been properly initialized.

Class.new(StandardError)
MaxRestartFrequencyError =

Raised when an object with a start/stop lifecycle has been started an excessive number of times. Often used in conjunction with a restart policy or strategy.

Class.new(StandardError)
MultipleAssignmentError =

Raised when an attempt is made to modify an immutable object (such as an ‘IVar`) after its final state has been set.

Class.new(StandardError)
RejectedExecutionError =

Raised by an ‘Executor` when it is unable to process a given task, possibly because of a reject policy or other internal error.

Class.new(StandardError)
TimeoutError =

Raised when an operation times out.

Class.new(StandardError)
Actress =
Concurrent::Actor
VERSION =
'0.7.1'

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logging

log

Class Method Details

.abort_transactionObject

Abort a currently running transaction - see ‘Concurrent::atomically`.



140
141
142
# File 'lib/concurrent/tvar.rb', line 140

def abort_transaction
  raise Transaction::AbortError.new
end

.atomicallyObject

Run a block that reads and writes ‘TVar`s as a single atomic transaction. With respect to the value of `TVar` objects, the transaction is atomic, in that it either happens or it does not, consistent, in that the `TVar` objects involved will never enter an illegal state, and isolated, in that transactions never interfere with each other. You may recognise these properties from database transactions.

There are some very important and unusual semantics that you must be aware of:

  • Most importantly, the block that you pass to atomically may be executed more than once. In most cases your code should be free of side-effects, except for via TVar.

  • If an exception escapes an atomically block it will abort the transaction.

  • It is undefined behaviour to use callcc or Fiber with atomically.

  • If you create a new thread within an atomically, it will not be part of the transaction. Creating a thread counts as a side-effect.

Transactions within transactions are flattened to a single transaction.

Examples:

a = new TVar(100_000)
b = new TVar(100)

Concurrent::atomically do
  a.value -= 10
  b.value += 10
end

Raises:

  • (ArgumentError)


86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/concurrent/tvar.rb', line 86

def atomically
  raise ArgumentError.new('no block given') unless block_given?

  # Get the current transaction

  transaction = Transaction::current

  # Are we not already in a transaction (not nested)?

  if transaction.nil?
    # New transaction

    begin
      # Retry loop

      loop do

        # Create a new transaction

        transaction = Transaction.new
        Transaction::current = transaction

        # Run the block, aborting on exceptions

        begin
          result = yield
        rescue Transaction::AbortError => e
          transaction.abort
          result = Transaction::ABORTED
        rescue => e
          transaction.abort
          raise e
        end
        # If we can commit, break out of the loop

        if result != Transaction::ABORTED
          if transaction.commit
            break result
          end
        end
      end
    ensure
      # Clear the current transaction

      Transaction::current = nil
    end
  else
    # Nested transaction - flatten it and just run the block

    yield
  end
end

.call_dataflow(method, executor, *inputs, &block) ⇒ Object

Raises:

  • (ArgumentError)


56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/concurrent/dataflow.rb', line 56

def call_dataflow(method, executor, *inputs, &block)
  raise ArgumentError.new('an executor must be provided') if executor.nil?
  raise ArgumentError.new('no block given') unless block_given?
  raise ArgumentError.new('not all dependencies are IVars') unless inputs.all? { |input| input.is_a? IVar }

  result = Future.new(executor: executor) do
    values = inputs.map { |input| input.send(method) }
    block.call(*values)
  end

  if inputs.empty?
    result.execute
  else
    counter = DependencyCounter.new(inputs.size) { result.execute }

    inputs.each do |input|
      input.add_observer counter
    end
  end

  result
end

.configurationConfiguration

Returns:



124
125
126
# File 'lib/concurrent/configuration.rb', line 124

def self.configuration
  @configuration.value
end

.configure {|the| ... } ⇒ Object

Perform gem-level configuration.

Yields:

  • the configuration commands

Yield Parameters:



132
133
134
# File 'lib/concurrent/configuration.rb', line 132

def self.configure
  yield(configuration)
end

.dataflow(*inputs) {|inputs| ... } ⇒ Object

Parameters:

  • inputs (Future)

    zero or more ‘Future` operations that this dataflow depends upon

Yields:

  • The operation to perform once all the dependencies are met

Yield Parameters:

  • inputs (Future)

    each of the ‘Future` inputs to the dataflow

Yield Returns:

  • (Object)

    the result of the block operation

Returns:

  • (Object)

    the result of all the operations

Raises:

  • (ArgumentError)

    if no block is given

  • (ArgumentError)

    if any of the inputs are not ‘IVar`s



34
35
36
# File 'lib/concurrent/dataflow.rb', line 34

def dataflow(*inputs, &block)
  dataflow_with(Concurrent.configuration.global_operation_pool, *inputs, &block)
end

.dataflow!(*inputs, &block) ⇒ Object



44
45
46
# File 'lib/concurrent/dataflow.rb', line 44

def dataflow!(*inputs, &block)
  dataflow_with!(Concurrent.configuration.global_task_pool, *inputs, &block)
end

.dataflow_with(executor, *inputs, &block) ⇒ Object



39
40
41
# File 'lib/concurrent/dataflow.rb', line 39

def dataflow_with(executor, *inputs, &block)
  call_dataflow(:value, executor, *inputs, &block)
end

.dataflow_with!(executor, *inputs, &block) ⇒ Object



49
50
51
# File 'lib/concurrent/dataflow.rb', line 49

def dataflow_with!(executor, *inputs, &block)
  call_dataflow(:value!, executor, *inputs, &block)
end

.finalize_global_executorsObject



136
137
138
139
140
# File 'lib/concurrent/configuration.rb', line 136

def self.finalize_global_executors
  self.finalize_executor(self.configuration.global_timer_set)
  self.finalize_executor(self.configuration.global_task_pool)
  self.finalize_executor(self.configuration.global_operation_pool)
end

.physical_processor_countObject



148
149
150
# File 'lib/concurrent/utility/processor_count.rb', line 148

def self.physical_processor_count
  processor_counter.physical_processor_count
end

.processor_countObject



144
145
146
# File 'lib/concurrent/utility/processor_count.rb', line 144

def self.processor_count
  processor_counter.processor_count
end

.timeout(seconds) ⇒ Object

Note:

This method is intended to be a simpler and more reliable replacement

Wait the given number of seconds for the block operation to complete.

to the Ruby standard library ‘Timeout::timeout` method.

Parameters:

  • seconds (Integer)

    The number of seconds to wait

Returns:

  • (Object)

    The result of the block operation

Raises:



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/concurrent/utility/timeout.rb', line 19

def timeout(seconds)

  thread = Thread.new do
    Thread.current[:result] = yield
  end
  success = thread.join(seconds)

  if success
    return thread[:result]
  else
    raise TimeoutError
  end
ensure
  Thread.kill(thread) unless thread.nil?
end

.timer(seconds, *args) { ... } ⇒ Boolean

Perform the given operation asynchronously after the given number of seconds.

Parameters:

  • seconds (Fixnum)

    the interval in seconds to wait before executing the task

Yields:

  • the task to execute

Returns:

  • (Boolean)

    true

Raises:

  • (ArgumentError)


13
14
15
16
17
18
19
# File 'lib/concurrent/utility/timer.rb', line 13

def timer(seconds, *args, &block)
  raise ArgumentError.new('no block given') unless block_given?
  raise ArgumentError.new('interval must be greater than or equal to zero') if seconds < 0

  Concurrent.configuration.global_timer_set.post(seconds, *args, &block)
  true
end

Instance Method Details

#compare_and_setBoolean

Atomically sets the value to the given updated value if the current value == the expected value.

Parameters:

  • expect (Fixnum)

    the expected value

  • update (Fixnum)

    the new value

Returns:

  • (Boolean)

    true if the value was updated else false



# File 'lib/concurrent/atomic/atomic_fixnum.rb', line 189


#decrementFixnum

Decreases the current value by 1.

Returns:

  • (Fixnum)

    the current value after decrementation



# File 'lib/concurrent/atomic/atomic_fixnum.rb', line 186


#false?Boolean

Is the current value ‘false`

Returns:

  • (Boolean)

    true if the current value is ‘false`, else false



# File 'lib/concurrent/atomic/atomic_boolean.rb', line 182


#incrementFixnum

Increases the current value by 1.

Returns:

  • (Fixnum)

    the current value after incrementation



# File 'lib/concurrent/atomic/atomic_fixnum.rb', line 183


#initializeObject

Creates a new ‘AtomicBoolean` with the given initial value.

Parameters:

  • initial (Boolean)

    the initial value



# File 'lib/concurrent/atomic/atomic_fixnum.rb', line 174


#make_falseBoolean

Explicitly sets the value to false.

Returns:

  • (Boolean)

    true is value has changed, otherwise false



# File 'lib/concurrent/atomic/atomic_boolean.rb', line 188


#make_trueBoolean

Explicitly sets the value to true.

Returns:

  • (Boolean)

    true is value has changed, otherwise false



# File 'lib/concurrent/atomic/atomic_boolean.rb', line 185


#true?Boolean

Is the current value ‘true`

Returns:

  • (Boolean)

    true if the current value is ‘true`, else false



# File 'lib/concurrent/atomic/atomic_boolean.rb', line 179


#valueBoolean

Retrieves the current ‘Boolean` value.

Returns:

  • (Boolean)

    the current value



# File 'lib/concurrent/atomic/atomic_fixnum.rb', line 177


#value=Boolean

Explicitly sets the value.

Parameters:

  • value (Boolean)

    the new value to be set

Returns:

  • (Boolean)

    the current value



# File 'lib/concurrent/atomic/atomic_fixnum.rb', line 180