Class: RestCore::Promise

Inherits:
Object
  • Object
show all
Includes:
RestCore
Defined in:
lib/rest-core/promise.rb

Defined Under Namespace

Classes: Future

Constant Summary

Constants included from RestCore

ASYNC, CLIENT, DRY, FAIL, HIJACK, LOG, PROMISE, REQUEST_HEADERS, REQUEST_METHOD, REQUEST_PATH, REQUEST_PAYLOAD, REQUEST_QUERY, REQUEST_URI, RESPONSE_BODY, RESPONSE_HEADERS, RESPONSE_KEY, RESPONSE_SOCKET, RESPONSE_STATUS, Simple, TIMER, Universal, VERSION

Class Method Summary collapse

Instance Method Summary collapse

Methods included from RestCore

eagerload, id

Constructor Details

#initialize(env, k = RC.id, immediate = false, &job) ⇒ Promise

Returns a new instance of Promise.



24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/rest-core/promise.rb', line 24

def initialize env, k=RC.id, immediate=false, &job
  self.env       = env
  self.k         = [k]
  self.immediate = immediate

  self.body, self.status, self.headers,
    self.socket, self.response, self.error, = nil

  self.condv     = ConditionVariable.new
  self.mutex     = Mutex.new

  defer(&job) if job
end

Class Method Details

.claim(env, k = RC.id, body, status, headers) ⇒ Object



18
19
20
21
22
# File 'lib/rest-core/promise.rb', line 18

def self.claim env, k=RC.id, body, status, headers
  promise = new(env, k)
  promise.fulfill(body, status, headers)
  promise
end

Instance Method Details

#defer(&job) ⇒ Object

called in client thread



57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/rest-core/promise.rb', line 57

def defer &job
  if pool_size < 0 # negative number for blocking call
    job.call
  elsif pool_size > 0
    self.task = client_class.thread_pool.defer do
      synchronized_yield{ job.call }
    end
  else
    Thread.new{ synchronized_yield{ job.call } }
  end
  env[TIMER].on_timeout{ reject(env[TIMER].error) } if env[TIMER]
end

#fulfill(body, status, headers, socket = nil) ⇒ Object

called in requesting thread after the request is done



83
84
85
86
87
88
89
90
# File 'lib/rest-core/promise.rb', line 83

def fulfill body, status, headers, socket=nil
  env[TIMER].cancel if env[TIMER]
  self.body, self.status, self.headers, self.socket =
    body, status, headers, socket
  # under ASYNC callback, should call immediately
  callback_in_async if immediate
  condv.broadcast # client or response might be waiting
end

#future_bodyObject



42
# File 'lib/rest-core/promise.rb', line 42

def future_body    ; Future.new(self, RESPONSE_BODY   ); end

#future_failuresObject



46
# File 'lib/rest-core/promise.rb', line 46

def future_failures; Future.new(self, FAIL)            ; end

#future_headersObject



44
# File 'lib/rest-core/promise.rb', line 44

def future_headers ; Future.new(self, RESPONSE_HEADERS); end

#future_responseObject



47
48
49
50
51
52
53
54
# File 'lib/rest-core/promise.rb', line 47

def future_response
  env.merge(RESPONSE_BODY    => future_body,
            RESPONSE_STATUS  => future_status,
            RESPONSE_HEADERS => future_headers,
            RESPONSE_SOCKET  => future_socket,
            FAIL             => future_failures,
            PROMISE          => self)
end

#future_socketObject



45
# File 'lib/rest-core/promise.rb', line 45

def future_socket  ; Future.new(self, RESPONSE_SOCKET ); end

#future_statusObject



43
# File 'lib/rest-core/promise.rb', line 43

def future_status  ; Future.new(self, RESPONSE_STATUS ); end

#inspectObject



38
39
40
# File 'lib/rest-core/promise.rb', line 38

def inspect
  "<#{self.class.name} for #{env[REQUEST_URI]}>"
end

#reject(error) ⇒ Object

called in requesting thread if something goes wrong or timed out



93
94
95
96
97
98
99
100
101
102
# File 'lib/rest-core/promise.rb', line 93

def reject error
  task.cancel if task

  self.error = if error.kind_of?(Exception)
                 error
               else
                 Error.new(error || 'unknown')
               end
  fulfill('', 0, {})
end

#then(&action) ⇒ Object

append your actions, which would be called when we’re calling back



105
106
107
108
# File 'lib/rest-core/promise.rb', line 105

def then &action
  k << action
  self
end

#waitObject

called in client thread (client.wait)



71
72
73
74
# File 'lib/rest-core/promise.rb', line 71

def wait
  # it might be awaken by some other futures!
  mutex.synchronize{ condv.wait(mutex) until !!status } unless !!status
end

#yieldObject

called in client thread (from the future (e.g. body))



77
78
79
80
# File 'lib/rest-core/promise.rb', line 77

def yield
  wait
  callback
end