Class: PromisePool::Promise

Inherits:
Object
  • Object
show all
Defined in:
lib/promise_pool/promise.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(timer = nil) ⇒ Promise

Returns a new instance of Promise.


23
24
25
26
27
28
29
30
31
# File 'lib/promise_pool/promise.rb', line 23

def initialize timer=nil
  self.value = self.error = self.result = nil
  self.resolved = false
  self.callbacks = []

  self.timer = timer
  self.condv = ConditionVariable.new
  self.mutex = Mutex.new
end

Class Method Details

.backtraceObject


14
15
16
# File 'lib/promise_pool/promise.rb', line 14

def self.backtrace
  Thread.current[:promise_pool_backtrace] || []
end

.claim(value, &callback) ⇒ Object


7
8
9
10
11
12
# File 'lib/promise_pool/promise.rb', line 7

def self.claim value, &callback
  promise = new
  promise.then(&callback) if block_given?
  promise.fulfill(value)
  promise
end

.set_backtrace(e) ⇒ Object

should never raise!


19
20
21
# File 'lib/promise_pool/promise.rb', line 19

def self.set_backtrace e
  e.set_backtrace((e.backtrace || caller) + backtrace)
end

Instance Method Details

#callObject


55
56
57
58
59
# File 'lib/promise_pool/promise.rb', line 55

def call
  self.thread = Thread.current # set working thread
  protected_yield{ yield } # avoid any exception and do the job
  self
end

#defer(pool = nil) ⇒ Object

called in client thread


34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/promise_pool/promise.rb', line 34

def defer pool=nil
  backtrace = caller + self.class.backtrace # retain the backtrace so far
  if pool
    mutex.synchronize do
      # still timing it out if the task never processed
      timer.on_timeout{ cancel_task } if timer
      self.task = pool.defer(mutex) do
        Thread.current[:promise_pool_backtrace] = backtrace
        protected_yield{ yield }
        Thread.current[:promise_pool_backtrace] = nil
      end
    end
  else
    self.thread = Thread.new do
      Thread.current[:promise_pool_backtrace] = backtrace
      protected_yield{ yield }
    end
  end
  self
end

#fulfill(value) ⇒ Object

called in requesting thread after the request is done


83
84
85
# File 'lib/promise_pool/promise.rb', line 83

def fulfill value
  mutex.synchronize{ fulfilling(value) }
end

#futureObject


61
62
63
# File 'lib/promise_pool/promise.rb', line 61

def future
  Future.new(self)
end

#reject(error) ⇒ Object

called in requesting thread if something goes wrong or timed out


88
89
90
# File 'lib/promise_pool/promise.rb', line 88

def reject error
  mutex.synchronize{ rejecting(error) }
end

#resolved?Boolean

Returns:

  • (Boolean)

98
99
100
# File 'lib/promise_pool/promise.rb', line 98

def resolved?
  resolved
end

#started?Boolean

Returns:

  • (Boolean)

102
103
104
# File 'lib/promise_pool/promise.rb', line 102

def started?
  !!working_thread
end

#then(&action) ⇒ Object

append your actions, which would be called when we're calling back


93
94
95
96
# File 'lib/promise_pool/promise.rb', line 93

def then &action
  callbacks << action
  self
end

#waitObject

called in client thread (client.wait)


66
67
68
69
# File 'lib/promise_pool/promise.rb', line 66

def wait
  # it might be awaken by some other futures!
  mutex.synchronize{ condv.wait(mutex) until resolved? } unless resolved?
end

#yieldObject

called in client thread (from the future (e.g. body))


72
73
74
75
76
77
78
79
80
# File 'lib/promise_pool/promise.rb', line 72

def yield
  wait
  case result
  when Exception
    raise result
  else
    result
  end
end