Class: Concurrent::Promise

Inherits:
Object
  • Object
show all
Includes:
Obligation
Defined in:
lib/concurrent/promise.rb

Overview

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Obligation

#completed?, #exception, #fulfilled?, #incomplete?, #no_error!, #pending?, #reason, #rejected?, #state, #unscheduled?, #value, #value!, #wait

Methods included from Dereferenceable

#value

Constructor Details

#initialize(opts = {}, &block) ⇒ Promise

Initialize a new Promise with the provided options.

Parameters:

  • opts (Hash) (defaults to: {})

    the options used to define the behavior at update and deref

Options Hash (opts):

  • :parent (Promise)

    the parent ‘Promise` when building a chain/tree

  • :on_fulfill (Proc)

    fulfillment handler

  • :on_reject (Proc)

    rejection handler

  • :operation (Boolean) — default: false

    when ‘true` will execute the future on the global operation pool (for long-running operations), when `false` will execute the future on the global task pool (for short-running tasks)

  • :executor (object)

    when provided will run all operations on this executor rather than the global thread pool (overrides :operation)

  • :dup_on_deref (String) — default: false

    call ‘#dup` before returning the data

  • :freeze_on_deref (String) — default: false

    call ‘#freeze` before returning the data

  • :copy_on_deref (String) — default: nil

    call the given ‘Proc` passing the internal value and returning the value returned from the proc

See Also:



34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/concurrent/promise.rb', line 34

def initialize(opts = {}, &block)
  opts.delete_if { |k, v| v.nil? }

  @executor = OptionsParser::get_executor_from(opts) || Concurrent.configuration.global_operation_pool
  @parent = opts.fetch(:parent) { nil }
  @on_fulfill = opts.fetch(:on_fulfill) { Proc.new { |result| result } }
  @on_reject = opts.fetch(:on_reject) { Proc.new { |reason| raise reason } }

  @promise_body = block || Proc.new { |result| result }
  @state = :unscheduled
  @children = []

  init_obligation
end

Class Method Details

.execute(opts = {}, &block) ⇒ Object

Since:

  • 0.5.0



75
76
77
# File 'lib/concurrent/promise.rb', line 75

def self.execute(opts = {}, &block)
  new(opts, &block).execute
end

.fulfill(value, opts = {}) ⇒ Promise

Returns:



50
51
52
# File 'lib/concurrent/promise.rb', line 50

def self.fulfill(value, opts = {})
  Promise.new(opts).tap { |p| p.send(:synchronized_set_state!, true, value, nil) }
end

.reject(reason, opts = {}) ⇒ Promise

Returns:



56
57
58
# File 'lib/concurrent/promise.rb', line 56

def self.reject(reason, opts = {})
  Promise.new(opts).tap { |p| p.send(:synchronized_set_state!, false, nil, reason) }
end

.zip(*promises) ⇒ Promise<Array>

Builds a promise that produces the result of promises in an Array and fails if any of them fails.

Parameters:

Returns:



149
150
151
152
153
154
155
156
157
158
159
# File 'lib/concurrent/promise.rb', line 149

def self.zip(*promises)
  zero = fulfill([], executor: ImmediateExecutor.new)

  promises.reduce(zero) do |p1, p2|
    p1.flat_map do |results|
      p2.then do |next_result|
        results << next_result
      end
    end
  end
end

Instance Method Details

#executePromise

Returns:

Since:

  • 0.5.0



62
63
64
65
66
67
68
69
70
71
72
# File 'lib/concurrent/promise.rb', line 62

def execute
  if root?
    if compare_and_set_state(:pending, :unscheduled)
      set_pending
      realize(@promise_body)
    end
  else
    @parent.execute
  end
  self
end

#flat_map(&block) ⇒ Promise

Yield the successful result to the block that returns a promise. If that promise is also successful the result is the result of the yielded promise. If either part fails the whole also fails.

Examples:

Promise.execute { 1 }.flat_map { |v| Promise.execute { v + 2 } }.value! #=> 3

Returns:



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/concurrent/promise.rb', line 122

def flat_map(&block)
  child = Promise.new(
    parent: self,
    executor: ImmediateExecutor.new,
  )

  on_error { |e| child.on_reject(e) }
  on_success do |result1|
    begin
      inner = block.call(result1)
      inner.execute
      inner.on_success { |result2| child.on_fulfill(result2) }
      inner.on_error { |e| child.on_reject(e) }
    rescue => e
      child.on_reject(e)
    end
  end

  child
end

#on_success(&block) ⇒ Promise

Returns:

Raises:

  • (ArgumentError)


101
102
103
104
# File 'lib/concurrent/promise.rb', line 101

def on_success(&block)
  raise ArgumentError.new('no block given') unless block_given?
  self.then &block
end

#rescue(&block) ⇒ Promise Also known as: catch, on_error

Returns:



107
108
109
# File 'lib/concurrent/promise.rb', line 107

def rescue(&block)
  self.then(block)
end

#then(rescuer = nil, &block) ⇒ Promise

Returns the new promise.

Returns:

Raises:

  • (ArgumentError)


80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/concurrent/promise.rb', line 80

def then(rescuer = nil, &block)
  raise ArgumentError.new('rescuers and block are both missing') if rescuer.nil? && !block_given?
  block = Proc.new { |result| result } if block.nil?
  child = Promise.new(
    parent: self,
    executor: @executor,
    on_fulfill: block,
    on_reject: rescuer
  )

  mutex.synchronize do
    child.state = :pending if @state == :pending
    child.on_fulfill(apply_deref_options(@value)) if @state == :fulfilled
    child.on_reject(@reason) if @state == :rejected
    @children << child
  end

  child
end

#zip(*others) ⇒ Promise<Array>

Builds a promise that produces the result of self and others in an Array and fails if any of them fails.

Parameters:

Returns:



167
168
169
# File 'lib/concurrent/promise.rb', line 167

def zip(*others)
  self.class.zip(self, *others)
end