Class: Concurrent::Edge::Event

Inherits:
Synchronization::Object
  • Object
show all
Includes:
Concern::Deprecation
Defined in:
lib/concurrent/edge/future.rb

Overview

Represents an event which will happen in future (will be completed). It has to always happen.

Direct Known Subclasses

CompletableEvent, Future

Instance Method Summary collapse

Constructor Details

#initialize(promise, default_executor) ⇒ Event

Returns a new instance of Event.



168
169
170
171
172
173
174
175
176
177
# File 'lib/concurrent/edge/future.rb', line 168

def initialize(promise, default_executor)
  @Promise         = promise
  @DefaultExecutor = default_executor
  @Touched         = AtomicBoolean.new(false)
  @Callbacks       = LockFreeStack.new
  @Waiters         = LockFreeStack.new # TODO replace with AtomicFixnum, avoid aba problem
  @State           = AtomicReference.new PENDING
  super()
  ensure_ivar_visibility!
end

Instance Method Details

#chain(executor = nil) {|success, value, reason| ... } ⇒ Object Also known as: then

Yields:

  • (success, value, reason)

    of the parent



230
231
232
# File 'lib/concurrent/edge/future.rb', line 230

def chain(executor = nil, &callback)
  ChainPromise.new(self, @DefaultExecutor, executor || @DefaultExecutor, &callback).future
end

#chain_completable(completable_event) ⇒ Object Also known as: tangle



236
237
238
# File 'lib/concurrent/edge/future.rb', line 236

def chain_completable(completable_event)
  on_completion! { completable_event.complete_with COMPLETED }
end

#completed?(state = @State.get) ⇒ Boolean Also known as: complete?

Has the Event been completed?

Returns:

  • (Boolean)


198
199
200
# File 'lib/concurrent/edge/future.rb', line 198

def completed?(state = @State.get)
  state.completed?
end

#default_executorExecutor

Returns current default executor.

Returns:

  • (Executor)

    current default executor

See Also:



225
226
227
# File 'lib/concurrent/edge/future.rb', line 225

def default_executor
  @DefaultExecutor
end

#delayEvent

Inserts delay into the chain of Futures making rest of it lazy evaluated.

Returns:



256
257
258
# File 'lib/concurrent/edge/future.rb', line 256

def delay
  ZipEventEventPromise.new(self, Delay.new(@DefaultExecutor).event, @DefaultExecutor).event
end

#inspectObject



298
299
300
# File 'lib/concurrent/edge/future.rb', line 298

def inspect
  "#{to_s[0..-2]} blocks:[#{blocks.map(&:to_s).join(', ')}]>"
end

#on_completion(executor = nil) {|success, value, reason| ... } ⇒ Object

Returns self.

Yields:

  • (success, value, reason)

    executed async on ‘executor` when completed

Returns:

  • self



278
279
280
# File 'lib/concurrent/edge/future.rb', line 278

def on_completion(executor = nil, &callback)
  add_callback :pr_async_callback_on_completion, executor || @DefaultExecutor, callback
end

#on_completion! {|success, value, reason| ... } ⇒ Object

Returns self.

Yields:

  • (success, value, reason)

    executed sync when completed

Returns:

  • self



284
285
286
# File 'lib/concurrent/edge/future.rb', line 284

def on_completion!(&callback)
  add_callback :pr_callback_on_completion, callback
end

#pending?(state = @State.get) ⇒ Boolean Also known as: incomplete?

Is Event/Future pending?

Returns:

  • (Boolean)


186
187
188
# File 'lib/concurrent/edge/future.rb', line 186

def pending?(state = @State.get)
  !state.completed?
end

#set(*args, &block) ⇒ Object



302
303
304
305
# File 'lib/concurrent/edge/future.rb', line 302

def set(*args, &block)
  raise 'Use CompletableEvent#complete or CompletableFuture#complete instead, ' +
            'constructed by Concurrent.event or Concurrent.future respectively.'
end

#state:pending, :completed

Returns:

  • (:pending, :completed)


180
181
182
# File 'lib/concurrent/edge/future.rb', line 180

def state
  @State.get.to_sym
end

#then_select(*channels) ⇒ Future

Zips with selected value form the suplied channels

Returns:



272
273
274
# File 'lib/concurrent/edge/future.rb', line 272

def then_select(*channels)
  ZipFutureEventPromise(Concurrent.select(*channels), self, @DefaultExecutor).future
end

#to_sObject



294
295
296
# File 'lib/concurrent/edge/future.rb', line 294

def to_s
  "<##{self.class}:0x#{'%x' % (object_id << 1)} #{state.to_sym}>"
end

#unscheduled?Boolean

Returns:

  • (Boolean)


190
191
192
# File 'lib/concurrent/edge/future.rb', line 190

def unscheduled?
  raise 'unsupported'
end

#wait(timeout = nil) ⇒ Event, ...

Note:

a thread should wait only once! For repeated checking use faster ‘completed?` check. If thread waits periodically it will dangerously grow the waiters stack.

Wait until Event is #complete?

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (Event, true, false)

    self or true/false if timeout is used



210
211
212
213
214
# File 'lib/concurrent/edge/future.rb', line 210

def wait(timeout = nil)
  touch
  result = wait_until_complete(timeout)
  timeout ? result : self
end

#with_default_executor(executor) ⇒ Event

Changes default executor for rest of the chain

Returns:



290
291
292
# File 'lib/concurrent/edge/future.rb', line 290

def with_default_executor(executor)
  EventWrapperPromise.new(self, executor).future
end

#zip(other) ⇒ Event Also known as: &

Zip with future producing new Future

Returns:



244
245
246
247
248
249
250
# File 'lib/concurrent/edge/future.rb', line 244

def zip(other)
  if other.is?(Future)
    ZipFutureEventPromise.new(other, self, @DefaultExecutor).future
  else
    ZipEventEventPromise.new(self, other, @DefaultExecutor).future
  end
end