Class: RestCore::Promise

Inherits:
Object
  • Object
show all
Includes:
RestCore
Defined in:
lib/rest-core/promise.rb

Defined Under Namespace

Classes: Future

Constant Summary

Constants included from RestCore

ASYNC, CLIENT, DRY, FAIL, HIJACK, LOG, PROMISE, REQUEST_HEADERS, REQUEST_METHOD, REQUEST_PATH, REQUEST_PAYLOAD, REQUEST_QUERY, REQUEST_URI, RESPONSE_BODY, RESPONSE_HEADERS, RESPONSE_KEY, RESPONSE_SOCKET, RESPONSE_STATUS, Simple, TIMER, Universal, VERSION

Class Method Summary collapse

Instance Method Summary collapse

Methods included from RestCore

eagerload, id

Constructor Details

#initialize(env, k = RC.id, immediate = false, &job) ⇒ Promise

Returns a new instance of Promise.



32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/rest-core/promise.rb', line 32

def initialize env, k=RC.id, immediate=false, &job
  self.env       = env
  self.k         = [k]
  self.immediate = immediate

  self.body, self.status, self.headers, self.socket,
    self.response, self.error, self.called = nil

  self.condv     = ConditionVariable.new
  self.mutex     = Mutex.new

  defer(&job) if job
end

Class Method Details

.backtraceObject



24
25
26
# File 'lib/rest-core/promise.rb', line 24

def self.backtrace
  Thread.current[:backtrace] || []
end

.claim(env, k = RC.id, body, status, headers) ⇒ Object



18
19
20
21
22
# File 'lib/rest-core/promise.rb', line 18

def self.claim env, k=RC.id, body, status, headers
  promise = new(env, k)
  promise.fulfill(body, status, headers)
  promise
end

.set_backtrace(e) ⇒ Object



28
29
30
# File 'lib/rest-core/promise.rb', line 28

def self.set_backtrace e
  e.set_backtrace((e.backtrace || caller) + backtrace)
end

Instance Method Details

#deferObject

called in client thread



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/rest-core/promise.rb', line 65

def defer
  if pool_size < 0 # negative number for blocking call
    self.thread = Thread.current
    # set timeout after thread set, before yield (because yield is blocking)
    env[TIMER].on_timeout{ cancel_task } if env[TIMER]
    protected_yield{ yield }
  else
    backtrace = caller + self.class.backtrace
    if pool_size > 0
      self.task = client_class.thread_pool.defer(mutex) do
        Thread.current[:backtrace] = backtrace
        protected_yield{ yield }
      end
    else
      self.thread = Thread.new do
        Thread.current[:backtrace] = backtrace
        protected_yield{ yield }
      end
    end
    # set timeout after thread/task set
    env[TIMER].on_timeout{ cancel_task } if env[TIMER]
  end
end

#done?Boolean

It’s considered done only if the HTTP request is done, and we’re not in asynchronous mode otherwise the callback should be called first. For synchronous mode, since we’re waiting for the callback anyway, we don’t really have to check if it’s called.

Returns:

  • (Boolean)


123
124
125
# File 'lib/rest-core/promise.rb', line 123

def done?
  !!status && (!immediate || called)
end

#fulfill(body, status, headers, socket = nil) ⇒ Object

called in requesting thread after the request is done



102
103
104
105
# File 'lib/rest-core/promise.rb', line 102

def fulfill body, status, headers, socket=nil
  env[TIMER].cancel if env[TIMER]
  mutex.synchronize{ fulfilling(body, status, headers, socket) }
end

#future_bodyObject



50
# File 'lib/rest-core/promise.rb', line 50

def future_body    ; Future.new(self, RESPONSE_BODY   ); end

#future_failuresObject



54
# File 'lib/rest-core/promise.rb', line 54

def future_failures; Future.new(self, FAIL)            ; end

#future_headersObject



52
# File 'lib/rest-core/promise.rb', line 52

def future_headers ; Future.new(self, RESPONSE_HEADERS); end

#future_responseObject



55
56
57
58
59
60
61
62
# File 'lib/rest-core/promise.rb', line 55

def future_response
  env.merge(RESPONSE_BODY    => future_body,
            RESPONSE_STATUS  => future_status,
            RESPONSE_HEADERS => future_headers,
            RESPONSE_SOCKET  => future_socket,
            FAIL             => future_failures,
            PROMISE          => self)
end

#future_socketObject



53
# File 'lib/rest-core/promise.rb', line 53

def future_socket  ; Future.new(self, RESPONSE_SOCKET ); end

#future_statusObject



51
# File 'lib/rest-core/promise.rb', line 51

def future_status  ; Future.new(self, RESPONSE_STATUS ); end

#inspectObject



46
47
48
# File 'lib/rest-core/promise.rb', line 46

def inspect
  "<#{self.class.name} for #{env[REQUEST_URI]}>"
end

#reject(error) ⇒ Object

called in requesting thread if something goes wrong or timed out



108
109
110
111
# File 'lib/rest-core/promise.rb', line 108

def reject error
  env[TIMER].cancel if env[TIMER]
  mutex.synchronize{ rejecting(error) }
end

#then(&action) ⇒ Object

append your actions, which would be called when we’re calling back



114
115
116
117
# File 'lib/rest-core/promise.rb', line 114

def then &action
  k << action
  self
end

#waitObject

called in client thread (client.wait)



90
91
92
93
# File 'lib/rest-core/promise.rb', line 90

def wait
  # it might be awaken by some other futures!
  mutex.synchronize{ condv.wait(mutex) until done? } unless done?
end

#yieldObject

called in client thread (from the future (e.g. body))



96
97
98
99
# File 'lib/rest-core/promise.rb', line 96

def yield
  wait
  callback
end