Class: Concurrent::Promise
- Inherits:
-
Object
- Object
- Concurrent::Promise
- Includes:
- Obligation
- Defined in:
- lib/concurrent/promise.rb
Overview
Class Method Summary collapse
- .execute(opts = {}, &block) ⇒ Object
- .fulfill(value, opts = {}) ⇒ Promise
- .reject(reason, opts = {}) ⇒ Promise
-
.zip(*promises) ⇒ Promise<Array>
Builds a promise that produces the result of promises in an Array and fails if any of them fails.
Instance Method Summary collapse
- #execute ⇒ Promise
-
#flat_map(&block) ⇒ Promise
Yield the successful result to the block that returns a promise.
-
#initialize(opts = {}, &block) ⇒ Promise
constructor
Initialize a new Promise with the provided options.
- #on_success(&block) ⇒ Promise
- #rescue(&block) ⇒ Promise (also: #catch, #on_error)
-
#then(rescuer = nil, &block) ⇒ Promise
The new promise.
-
#zip(*others) ⇒ Promise<Array>
Builds a promise that produces the result of self and others in an Array and fails if any of them fails.
Methods included from Obligation
#completed?, #exception, #fulfilled?, #incomplete?, #no_error!, #pending?, #reason, #rejected?, #state, #unscheduled?, #value, #value!, #wait
Methods included from Dereferenceable
Constructor Details
#initialize(opts = {}, &block) ⇒ Promise
Initialize a new Promise with the provided options.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/concurrent/promise.rb', line 34 def initialize(opts = {}, &block) opts.delete_if { |k, v| v.nil? } @executor = OptionsParser::get_executor_from(opts) || Concurrent.configuration.global_operation_pool @parent = opts.fetch(:parent) { nil } @on_fulfill = opts.fetch(:on_fulfill) { Proc.new { |result| result } } @on_reject = opts.fetch(:on_reject) { Proc.new { |reason| raise reason } } @promise_body = block || Proc.new { |result| result } @state = :unscheduled @children = [] init_obligation end |
Class Method Details
.execute(opts = {}, &block) ⇒ Object
75 76 77 |
# File 'lib/concurrent/promise.rb', line 75 def self.execute(opts = {}, &block) new(opts, &block).execute end |
.fulfill(value, opts = {}) ⇒ Promise
50 51 52 |
# File 'lib/concurrent/promise.rb', line 50 def self.fulfill(value, opts = {}) Promise.new(opts).tap { |p| p.send(:synchronized_set_state!, true, value, nil) } end |
.reject(reason, opts = {}) ⇒ Promise
56 57 58 |
# File 'lib/concurrent/promise.rb', line 56 def self.reject(reason, opts = {}) Promise.new(opts).tap { |p| p.send(:synchronized_set_state!, false, nil, reason) } end |
.zip(*promises) ⇒ Promise<Array>
Builds a promise that produces the result of promises in an Array and fails if any of them fails.
149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/concurrent/promise.rb', line 149 def self.zip(*promises) zero = fulfill([], executor: ImmediateExecutor.new) promises.reduce(zero) do |p1, p2| p1.flat_map do |results| p2.then do |next_result| results << next_result end end end end |
Instance Method Details
#execute ⇒ Promise
62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/concurrent/promise.rb', line 62 def execute if root? if compare_and_set_state(:pending, :unscheduled) set_pending realize(@promise_body) end else @parent.execute end self end |
#flat_map(&block) ⇒ Promise
Yield the successful result to the block that returns a promise. If that promise is also successful the result is the result of the yielded promise. If either part fails the whole also fails.
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/concurrent/promise.rb', line 122 def flat_map(&block) child = Promise.new( parent: self, executor: ImmediateExecutor.new, ) on_error { |e| child.on_reject(e) } on_success do |result1| begin inner = block.call(result1) inner.execute inner.on_success { |result2| child.on_fulfill(result2) } inner.on_error { |e| child.on_reject(e) } rescue => e child.on_reject(e) end end child end |
#on_success(&block) ⇒ Promise
101 102 103 104 |
# File 'lib/concurrent/promise.rb', line 101 def on_success(&block) raise ArgumentError.new('no block given') unless block_given? self.then &block end |
#rescue(&block) ⇒ Promise Also known as: catch, on_error
107 108 109 |
# File 'lib/concurrent/promise.rb', line 107 def rescue(&block) self.then(block) end |
#then(rescuer = nil, &block) ⇒ Promise
Returns the new promise.
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/concurrent/promise.rb', line 80 def then(rescuer = nil, &block) raise ArgumentError.new('rescuers and block are both missing') if rescuer.nil? && !block_given? block = Proc.new { |result| result } if block.nil? child = Promise.new( parent: self, executor: @executor, on_fulfill: block, on_reject: rescuer ) mutex.synchronize do child.state = :pending if @state == :pending child.on_fulfill((@value)) if @state == :fulfilled child.on_reject(@reason) if @state == :rejected @children << child end child end |
#zip(*others) ⇒ Promise<Array>
Builds a promise that produces the result of self and others in an Array and fails if any of them fails.
167 168 169 |
# File 'lib/concurrent/promise.rb', line 167 def zip(*others) self.class.zip(self, *others) end |