Class: Concurrent::Promises::Future

Inherits:
AbstractEventFuture show all
Includes:
ActorIntegration, NewChannelIntegration, ThrottleIntegration
Defined in:
lib/concurrent/edge/promises.rb,
lib/concurrent/edge/throttle.rb,
lib/concurrent/edge/old_channel_integration.rb,
lib/concurrent/edge/promises.rb,
lib/concurrent/edge/promises.rb

Overview

Represents a value which will become available in future. May reject with a reason instead, e.g. when the tasks raises an exception.

Direct Known Subclasses

ResolvableFuture

Defined Under Namespace

Modules: ActorIntegration, NewChannelIntegration, ThrottleIntegration

Instance Method Summary collapse

Methods included from NewChannelIntegration

#then_push_channel

Methods included from ActorIntegration

#then_ask

Methods included from ThrottleIntegration

#rescue_throttled_by, #then_throttled_by

Methods inherited from AbstractEventFuture

#chain, #chain_on, #chain_resolvable, #default_executor, #on_resolution, #on_resolution!, #on_resolution_using, #pending?, #resolved?, #state, #to_s, #touch, #wait

Methods included from AbstractEventFuture::ThrottleIntegration

#chain_throttled_by, #throttled_by

Instance Method Details

#any(event_or_future) ⇒ Future Also known as: |

Creates a new event which will be resolved when the first of receiver, ‘event_or_future` resolves. Returning future will have value nil if event_or_future is event and resolves first.

Returns:



1022
1023
1024
# File 'lib/concurrent/edge/promises.rb', line 1022

def any(event_or_future)
  AnyResolvedFuturePromise.new_blocked_by2(self, event_or_future, @DefaultExecutor).future
end

#delayFuture

Creates new future dependent on receiver which will not evaluate until touched, see AbstractEventFuture#touch. In other words, it inserts delay into the chain of Futures making rest of it lazy evaluated.

Returns:



1032
1033
1034
1035
# File 'lib/concurrent/edge/promises.rb', line 1032

def delay
  event = DelayPromise.new(@DefaultExecutor).event
  ZipFutureEventPromise.new_blocked_by2(self, event, @DefaultExecutor).future
end

#exception(*args) ⇒ Exception

Allows rejected Future to be risen with ‘raise` method.

Examples:

raise Promises.rejected_future(StandardError.new("boom"))

Returns:

  • (Exception)

Raises:

  • (StandardError)

    when raising not rejected future



959
960
961
962
963
964
965
966
967
# File 'lib/concurrent/edge/promises.rb', line 959

def exception(*args)
  raise Concurrent::Error, 'it is not rejected' unless rejected?
  reason = Array(internal_state.reason).compact
  if reason.size > 1
    Concurrent::MultipleErrors.new reason
  else
    reason[0].exception(*args)
  end
end

#flat_eventEvent

Creates new event which will be resolved when the returned event by receiver is. Be careful if the receiver rejects it will just resolve since Event does not hold reason.

Returns:



1067
1068
1069
# File 'lib/concurrent/edge/promises.rb', line 1067

def flat_event
  FlatEventPromise.new_blocked_by1(self, @DefaultExecutor).event
end

#flat_future(level = 1) ⇒ Future Also known as: flat

Creates new future which will have result of the future returned by receiver. If receiver rejects it will have its rejection.

Parameters:

  • level (Integer) (defaults to: 1)

    how many levels of futures should flatten

Returns:



1057
1058
1059
# File 'lib/concurrent/edge/promises.rb', line 1057

def flat_future(level = 1)
  FlatFuturePromise.new_blocked_by1(self, level, @DefaultExecutor).future
end

#fulfilled?(state = internal_state) ⇒ Boolean

Is it in fulfilled state?

Returns:

  • (Boolean)


891
892
893
# File 'lib/concurrent/edge/promises.rb', line 891

def fulfilled?(state = internal_state)
  state.resolved? && state.fulfilled?
end

#on_fulfillment(*args, &callback) ⇒ self

Shortcut of #on_fulfillment_using with default ‘:io` executor supplied.

Returns:

  • (self)

See Also:



1073
1074
1075
# File 'lib/concurrent/edge/promises.rb', line 1073

def on_fulfillment(*args, &callback)
  on_fulfillment_using @DefaultExecutor, *args, &callback
end

#on_fulfillment!(*args) {|value, *args| ... } ⇒ self

Stores the callback to be executed synchronously on resolving thread after it is fulfilled. Does nothing on rejection.

Parameters:

  • args (Object)

    arguments which are passed to the task when it’s executed. (It might be prepended with other arguments, see the @yeild section).

Yields:

  • (value, *args)

    to the callback.

Yield Returns:

  • is forgotten.

Returns:

  • (self)


1084
1085
1086
# File 'lib/concurrent/edge/promises.rb', line 1084

def on_fulfillment!(*args, &callback)
  add_callback :callback_on_fulfillment, args, callback
end

#on_fulfillment_using(executor, *args) {|value, *args| ... } ⇒ self

Stores the callback to be executed asynchronously on executor after it is fulfilled. Does nothing on rejection.

Parameters:

  • executor (Executor, :io, :fast)

    Instance of an executor or a name of the global executor. The task is executed on it, default executor remains unchanged.

  • args (Object)

    arguments which are passed to the task when it’s executed. (It might be prepended with other arguments, see the @yeild section).

Yields:

  • (value, *args)

    to the callback.

Yield Returns:

  • is forgotten.

Returns:

  • (self)


1096
1097
1098
# File 'lib/concurrent/edge/promises.rb', line 1096

def on_fulfillment_using(executor, *args, &callback)
  add_callback :async_callback_on_fulfillment, executor, args, callback
end

#on_rejection(*args, &callback) ⇒ self

Shortcut of #on_rejection_using with default ‘:io` executor supplied.

Returns:

  • (self)

See Also:



1102
1103
1104
# File 'lib/concurrent/edge/promises.rb', line 1102

def on_rejection(*args, &callback)
  on_rejection_using @DefaultExecutor, *args, &callback
end

#on_rejection!(*args) {|reason, *args| ... } ⇒ self

Stores the callback to be executed synchronously on resolving thread after it is rejected. Does nothing on fulfillment.

Parameters:

  • args (Object)

    arguments which are passed to the task when it’s executed. (It might be prepended with other arguments, see the @yeild section).

Yields:

  • (reason, *args)

    to the callback.

Yield Returns:

  • is forgotten.

Returns:

  • (self)


1113
1114
1115
# File 'lib/concurrent/edge/promises.rb', line 1113

def on_rejection!(*args, &callback)
  add_callback :callback_on_rejection, args, callback
end

#on_rejection_using(executor, *args) {|reason, *args| ... } ⇒ self

Stores the callback to be executed asynchronously on executor after it is rejected. Does nothing on fulfillment.

Parameters:

  • executor (Executor, :io, :fast)

    Instance of an executor or a name of the global executor. The task is executed on it, default executor remains unchanged.

  • args (Object)

    arguments which are passed to the task when it’s executed. (It might be prepended with other arguments, see the @yeild section).

Yields:

  • (reason, *args)

    to the callback.

Yield Returns:

  • is forgotten.

Returns:

  • (self)


1125
1126
1127
# File 'lib/concurrent/edge/promises.rb', line 1125

def on_rejection_using(executor, *args, &callback)
  add_callback :async_callback_on_rejection, executor, args, callback
end

#reason(timeout = nil) ⇒ Exception?

Note:

This function potentially blocks current thread until the Future is resolved. Be careful it can deadlock. Try to chain instead.

Note:

Make sure returned ‘nil` is not confused with timeout, no value when rejected, no reason when fulfilled, etc. Use more exact methods if needed, like AbstractEventFuture#wait, #value!, #result, etc.

Returns reason of future’s rejection. Calls AbstractEventFuture#touch.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (Exception, nil)

    nil on timeout or fulfillment.



925
926
927
# File 'lib/concurrent/edge/promises.rb', line 925

def reason(timeout = nil)
  internal_state.reason if wait_until_resolved timeout
end

#rejected?(state = internal_state) ⇒ Boolean

Is it in rejected state?

Returns:

  • (Boolean)


897
898
899
# File 'lib/concurrent/edge/promises.rb', line 897

def rejected?(state = internal_state)
  state.resolved? && !state.fulfilled?
end

#rescue(*args, &task) ⇒ Future

Shortcut of #rescue_on with default ‘:io` executor supplied.

Returns:

See Also:



989
990
991
# File 'lib/concurrent/edge/promises.rb', line 989

def rescue(*args, &task)
  rescue_on @DefaultExecutor, *args, &task
end

#rescue_on(executor, *args) {|reason, *args| ... } ⇒ Future

Chains the task to be executed asynchronously on executor after it rejects. Does not run the task if it fulfills. It will resolve though, triggering any dependent futures.

Parameters:

  • executor (Executor, :io, :fast)

    Instance of an executor or a name of the global executor. The task is executed on it, default executor remains unchanged.

  • args (Object)

    arguments which are passed to the task when it’s executed. (It might be prepended with other arguments, see the @yeild section).

Yields:

  • (reason, *args)

    to the task.

Yield Returns:

  • will become result of the returned Future. Its returned value becomes #value fulfilling it, raised exception becomes #reason rejecting it.

Returns:



1001
1002
1003
# File 'lib/concurrent/edge/promises.rb', line 1001

def rescue_on(executor, *args, &task)
  RescuePromise.new_blocked_by1(self, @DefaultExecutor, executor, args, &task).future
end

#result(timeout = nil) ⇒ Array(Boolean, Object, Exception)?

Note:

This function potentially blocks current thread until the Future is resolved. Be careful it can deadlock. Try to chain instead.

Returns triplet fulfilled?, value, reason. Calls AbstractEventFuture#touch.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (Array(Boolean, Object, Exception), nil)

    triplet of fulfilled?, value, reason, or nil on timeout.



936
937
938
# File 'lib/concurrent/edge/promises.rb', line 936

def result(timeout = nil)
  internal_state.result if wait_until_resolved timeout
end

#runFuture

Allows to use futures as green threads. The receiver has to evaluate to a future which represents what should be done next. It basically flattens indefinitely until non Future values is returned which becomes result of the returned future. Any encountered exception will become reason of the returned future.

Examples:

body = lambda do |v|
  v += 1
  v < 5 ? Promises.future(v, &body) : v
end
Promises.future(0, &body).run.value! # => 5

Returns:



1141
1142
1143
# File 'lib/concurrent/edge/promises.rb', line 1141

def run
  RunFuturePromise.new_blocked_by1(self, @DefaultExecutor).future
end

#schedule(intended_time) ⇒ Future

Creates new event dependent on receiver scheduled to execute on/in intended_time. In time is interpreted from the moment the receiver is resolved, therefore it inserts delay into the chain.

Parameters:

  • intended_time (Numeric, Time)

    ‘Numeric` means to run in `intended_time` seconds. `Time` means to run on `intended_time`.

Returns:



1039
1040
1041
1042
1043
1044
# File 'lib/concurrent/edge/promises.rb', line 1039

def schedule(intended_time)
  chain do
    event = ScheduledPromise.new(@DefaultExecutor, intended_time).event
    ZipFutureEventPromise.new_blocked_by2(self, event, @DefaultExecutor).future
  end.flat
end

#then(*args, &task) ⇒ Future

Shortcut of #then_on with default ‘:io` executor supplied.

Returns:

See Also:



971
972
973
# File 'lib/concurrent/edge/promises.rb', line 971

def then(*args, &task)
  then_on @DefaultExecutor, *args, &task
end

#then_on(executor, *args) {|value, *args| ... } ⇒ Future

Chains the task to be executed asynchronously on executor after it fulfills. Does not run the task if it rejects. It will resolve though, triggering any dependent futures.

Parameters:

  • executor (Executor, :io, :fast)

    Instance of an executor or a name of the global executor. The task is executed on it, default executor remains unchanged.

  • args (Object)

    arguments which are passed to the task when it’s executed. (It might be prepended with other arguments, see the @yeild section).

Yields:

  • (value, *args)

    to the task.

Yield Returns:

  • will become result of the returned Future. Its returned value becomes #value fulfilling it, raised exception becomes #reason rejecting it.

Returns:



983
984
985
# File 'lib/concurrent/edge/promises.rb', line 983

def then_on(executor, *args, &task)
  ThenPromise.new_blocked_by1(self, @DefaultExecutor, executor, args, &task).future
end

#to_eventEvent

Converts future to event which is resolved when future is resolved by fulfillment or rejection.

Returns:



1153
1154
1155
1156
1157
# File 'lib/concurrent/edge/promises.rb', line 1153

def to_event
  event = Promises.resolvable_event
ensure
  chain_resolvable(event)
end

#to_futureFuture

Returns self, since this is a future

Returns:



1161
1162
1163
# File 'lib/concurrent/edge/promises.rb', line 1161

def to_future
  self
end

#value(timeout = nil) ⇒ Object?

Note:

This function potentially blocks current thread until the Future is resolved. Be careful it can deadlock. Try to chain instead.

Note:

Make sure returned ‘nil` is not confused with timeout, no value when rejected, no reason when fulfilled, etc. Use more exact methods if needed, like AbstractEventFuture#wait, #value!, #result, etc.

Return value of the future. Calls AbstractEventFuture#touch.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (Object, nil)

    the value of the Future when fulfilled, nil on timeout or rejection.



914
915
916
# File 'lib/concurrent/edge/promises.rb', line 914

def value(timeout = nil)
  internal_state.value if wait_until_resolved timeout
end

#value!(timeout = nil) ⇒ Object?

Note:

This function potentially blocks current thread until the Future is resolved. Be careful it can deadlock. Try to chain instead.

Note:

Make sure returned ‘nil` is not confused with timeout, no value when rejected, no reason when fulfilled, etc. Use more exact methods if needed, like AbstractEventFuture#wait, #value!, #result, etc.

Return value of the future. Calls AbstractEventFuture#touch.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (Object, nil)

    the value of the Future when fulfilled, nil on timeout.

Raises:

  • (Exception)

    #reason on rejection



950
951
952
# File 'lib/concurrent/edge/promises.rb', line 950

def value!(timeout = nil)
  internal_state.value if wait_until_resolved! timeout
end

#wait!(timeout = nil) ⇒ Future, ...

Note:

This function potentially blocks current thread until the Future is resolved. Be careful it can deadlock. Try to chain instead.

Wait (block the Thread) until receiver is AbstractEventFuture#resolved?. Calls AbstractEventFuture#touch.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (Future, true, false)

    self implies timeout was not used, true implies timeout was used and it was resolved, false implies it was not resolved within timeout.

Raises:

  • (Exception)

    #reason on rejection



942
943
944
945
# File 'lib/concurrent/edge/promises.rb', line 942

def wait!(timeout = nil)
  result = wait_until_resolved!(timeout)
  timeout ? result : self
end

#with_default_executor(executor) ⇒ Future

Crates new object with same class with the executor set as its new default executor. Any futures depending on it will use the new default executor.

Returns:



1048
1049
1050
# File 'lib/concurrent/edge/promises.rb', line 1048

def with_default_executor(executor)
  FutureWrapperPromise.new_blocked_by1(self, executor).future
end

#zip(other) ⇒ Future Also known as: &

Creates a new event or a future which will be resolved when receiver and other are. Returns an event if receiver and other are events, otherwise returns a future. If just one of the parties is Future then the result of the returned future is equal to the result of the supplied future. If both are futures then the result is as described in Concurrent::Promises::FactoryMethods#zip_futures_on.

Returns:



1007
1008
1009
1010
1011
1012
1013
# File 'lib/concurrent/edge/promises.rb', line 1007

def zip(other)
  if other.is_a?(Future)
    ZipFuturesPromise.new_blocked_by2(self, other, @DefaultExecutor).future
  else
    ZipFutureEventPromise.new_blocked_by2(self, other, @DefaultExecutor).future
  end
end