Class: RestCore::Promise

Inherits:
Object
  • Object
show all
Includes:
RestCore
Defined in:
lib/rest-core/promise.rb

Defined Under Namespace

Classes: Future

Constant Summary

Constants included from RestCore

ASYNC, CLIENT, DRY, FAIL, HIJACK, LOG, PROMISE, REQUEST_HEADERS, REQUEST_METHOD, REQUEST_PATH, REQUEST_PAYLOAD, REQUEST_QUERY, REQUEST_URI, RESPONSE_BODY, RESPONSE_HEADERS, RESPONSE_KEY, RESPONSE_SOCKET, RESPONSE_STATUS, Simple, TIMER, Universal, VERSION

Class Method Summary collapse

Instance Method Summary collapse

Methods included from RestCore

eagerload, id

Constructor Details

#initialize(env, k = RC.id, immediate = false, &job) ⇒ Promise

Returns a new instance of Promise.



33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/rest-core/promise.rb', line 33

def initialize env, k=RC.id, immediate=false, &job
  self.env       = env
  self.k         = [k]
  self.immediate = immediate

  self.body, self.status, self.headers, self.socket,
    self.response, self.error, self.called = nil

  self.condv     = ConditionVariable.new
  self.mutex     = Mutex.new

  defer(&job) if job
end

Class Method Details

.backtraceObject



24
25
26
# File 'lib/rest-core/promise.rb', line 24

def self.backtrace
  Thread.current[:backtrace] || []
end

.claim(env, k = RC.id, body, status, headers) ⇒ Object



18
19
20
21
22
# File 'lib/rest-core/promise.rb', line 18

def self.claim env, k=RC.id, body, status, headers
  promise = new(env, k, env[ASYNC])
  promise.fulfill(body, status, headers)
  promise
end

.set_backtrace(e) ⇒ Object

should never raise!



29
30
31
# File 'lib/rest-core/promise.rb', line 29

def self.set_backtrace e
  e.set_backtrace((e.backtrace || caller) + backtrace)
end

Instance Method Details

#deferObject

called in client thread



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/rest-core/promise.rb', line 66

def defer
  if pool_size < 0 # negative number for blocking call
    self.thread = Thread.current # set working thread
    protected_yield{ yield } # avoid any exception and do the job
  else
    backtrace = caller + self.class.backtrace # retain the backtrace so far
    if pool_size > 0
      mutex.synchronize do
        # still timing it out if the task never processed
        env[TIMER].on_timeout{ cancel_task } if env[TIMER]
        self.task = client_class.thread_pool.defer(mutex) do
          Thread.current[:backtrace] = backtrace
          protected_yield{ yield }
          Thread.current[:backtrace] = nil
        end
      end
    else
      self.thread = Thread.new do
        Thread.current[:backtrace] = backtrace
        protected_yield{ yield }
      end
    end
  end
end

#doneObject

called in Client.defer to mark this promise as done



122
123
124
# File 'lib/rest-core/promise.rb', line 122

def done
  fulfill('', 0, {})
end

#done?Boolean

It’s considered done only if the HTTP request is done, and we’re not in asynchronous mode otherwise the callback should be called first. For synchronous mode, since we’re waiting for the callback anyway, we don’t really have to check if it’s called.

Returns:

  • (Boolean)


130
131
132
# File 'lib/rest-core/promise.rb', line 130

def done?
  !!status && (!immediate || called)
end

#fulfill(body, status, headers, socket = nil) ⇒ Object

called in requesting thread after the request is done



104
105
106
107
# File 'lib/rest-core/promise.rb', line 104

def fulfill body, status, headers, socket=nil
  env[TIMER].cancel if env[TIMER]
  mutex.synchronize{ fulfilling(body, status, headers, socket) }
end

#future_bodyObject



51
# File 'lib/rest-core/promise.rb', line 51

def future_body    ; Future.new(self, RESPONSE_BODY   ); end

#future_failuresObject



55
# File 'lib/rest-core/promise.rb', line 55

def future_failures; Future.new(self, FAIL)            ; end

#future_headersObject



53
# File 'lib/rest-core/promise.rb', line 53

def future_headers ; Future.new(self, RESPONSE_HEADERS); end

#future_responseObject



56
57
58
59
60
61
62
63
# File 'lib/rest-core/promise.rb', line 56

def future_response
  env.merge(RESPONSE_BODY    => future_body,
            RESPONSE_STATUS  => future_status,
            RESPONSE_HEADERS => future_headers,
            RESPONSE_SOCKET  => future_socket,
            FAIL             => future_failures,
            PROMISE          => self)
end

#future_socketObject



54
# File 'lib/rest-core/promise.rb', line 54

def future_socket  ; Future.new(self, RESPONSE_SOCKET ); end

#future_statusObject



52
# File 'lib/rest-core/promise.rb', line 52

def future_status  ; Future.new(self, RESPONSE_STATUS ); end

#inspectObject



47
48
49
# File 'lib/rest-core/promise.rb', line 47

def inspect
  "<#{self.class.name} for #{env[REQUEST_URI]}>"
end

#reject(error) ⇒ Object

called in requesting thread if something goes wrong or timed out



110
111
112
113
# File 'lib/rest-core/promise.rb', line 110

def reject error
  env[TIMER].cancel if env[TIMER]
  mutex.synchronize{ rejecting(error) }
end

#then(&action) ⇒ Object

append your actions, which would be called when we’re calling back



116
117
118
119
# File 'lib/rest-core/promise.rb', line 116

def then &action
  k << action
  self
end

#waitObject

called in client thread (client.wait)



92
93
94
95
# File 'lib/rest-core/promise.rb', line 92

def wait
  # it might be awaken by some other futures!
  mutex.synchronize{ condv.wait(mutex) until done? } unless done?
end

#yieldObject

called in client thread (from the future (e.g. body))



98
99
100
101
# File 'lib/rest-core/promise.rb', line 98

def yield
  wait
  callback
end