Class: CloudI::API
Constant Summary collapse
- ASYNC =
unbuffered output is with $stderr.puts ‘…’
1- SYNC =
-1
Class Method Summary collapse
Instance Method Summary collapse
- #forward_(command, name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
- #forward_async(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
- #forward_sync(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
- #info_key_value_parse(message_info) ⇒ Object
-
#initialize(thread_index) ⇒ API
constructor
A new instance of API.
- #mcast_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
- #poll ⇒ Object
- #prefix ⇒ Object
- #process_count ⇒ Object
- #process_count_max ⇒ Object
- #process_count_min ⇒ Object
- #process_index ⇒ Object
- #recv_async(timeout = nil, trans_id = nil, consume = true) ⇒ Object
- #request_http_qs_parse(request) ⇒ Object
- #return_(command, name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
- #return_async(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
- #return_sync(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
- #send_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
- #send_sync(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
- #subscribe(pattern, function) ⇒ Object
- #timeout_async ⇒ Object
- #timeout_sync ⇒ Object
- #unsubscribe(pattern) ⇒ Object
Constructor Details
#initialize(thread_index) ⇒ API
Returns a new instance of API.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/cloudi.rb', line 57 def initialize(thread_index) protocol = API.getenv('CLOUDI_API_INIT_PROTOCOL') buffer_size_str = API.getenv('CLOUDI_API_INIT_BUFFER_SIZE') if protocol == 'tcp' @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false) @s.sync = true @use_header = true elsif protocol == 'udp' @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false) @s.sync = true @use_header = false elsif protocol == 'local' @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false) @s.sync = true @use_header = true else raise InvalidInputException end @initialization_complete = false @size = buffer_size_str.to_i @callbacks = Hash.new send(term_to_binary(:init)) poll_request(false) end |
Class Method Details
.assert ⇒ Object
591 592 593 |
# File 'lib/cloudi.rb', line 591 def self.assert raise 'Assertion failed !' unless yield # if $DEBUG end |
.thread_count ⇒ Object
82 83 84 85 |
# File 'lib/cloudi.rb', line 82 def self.thread_count s = getenv('CLOUDI_API_INIT_THREAD_COUNT') s.to_i end |
Instance Method Details
#forward_(command, name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/cloudi.rb', line 163 def forward_(command, name, request_info, request, timeout, priority, trans_id, pid) case command when ASYNC forward_async(name, request_info, request, timeout, priority, trans_id, pid) when SYNC forward_sync(name, request_info, request, timeout, priority, trans_id, pid) end end |
#forward_async(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/cloudi.rb', line 175 def forward_async(name, request_info, request, timeout, priority, trans_id, pid) if @requestTimeoutAdjustment if timeout == @request_timeout elapsed = [0, ((Time.now - @request_timer) * 1000.0).floor].max if elapsed > timeout timeout = 0 else timeout -= elapsed end end end send(term_to_binary([:forward_async, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority, OtpErlangBinary.new(trans_id), pid])) raise ForwardAsyncException.new() end |
#forward_sync(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/cloudi.rb', line 196 def forward_sync(name, request_info, request, timeout, priority, trans_id, pid) if @requestTimeoutAdjustment if timeout == @request_timeout elapsed = [0, ((Time.now - @request_timer) * 1000.0).floor].max if elapsed > timeout timeout = 0 else timeout -= elapsed end end end send(term_to_binary([:forward_sync, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority, OtpErlangBinary.new(trans_id), pid])) raise ForwardSyncException.new() end |
#info_key_value_parse(message_info) ⇒ Object
587 588 589 |
# File 'lib/cloudi.rb', line 587 def info_key_value_parse() binary_key_value_parse() end |
#mcast_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/cloudi.rb', line 145 def mcast_async(name, request, timeout=nil, request_info=nil, priority=nil) if timeout.nil? timeout = @timeoutAsync end if request_info.nil? request_info = '' end if priority.nil? priority = @priorityDefault end send(term_to_binary([:mcast_async, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority])) return poll_request(false) end |
#poll ⇒ Object
563 564 565 |
# File 'lib/cloudi.rb', line 563 def poll poll_request(true) end |
#prefix ⇒ Object
303 304 305 |
# File 'lib/cloudi.rb', line 303 def prefix return @prefix end |
#process_count ⇒ Object
291 292 293 |
# File 'lib/cloudi.rb', line 291 def process_count return @processCount end |
#process_count_max ⇒ Object
295 296 297 |
# File 'lib/cloudi.rb', line 295 def process_count_max return @processCountMax end |
#process_count_min ⇒ Object
299 300 301 |
# File 'lib/cloudi.rb', line 299 def process_count_min return @processCountMin end |
#process_index ⇒ Object
287 288 289 |
# File 'lib/cloudi.rb', line 287 def process_index return @processIndex end |
#recv_async(timeout = nil, trans_id = nil, consume = true) ⇒ Object
275 276 277 278 279 280 281 282 283 284 285 |
# File 'lib/cloudi.rb', line 275 def recv_async(timeout=nil, trans_id=nil, consume=true) if timeout.nil? timeout = @timeoutSync end if trans_id.nil? trans_id = 0.chr * 16 end send(term_to_binary([:recv_async, timeout, OtpErlangBinary.new(trans_id), consume])) return poll_request(false) end |
#request_http_qs_parse(request) ⇒ Object
583 584 585 |
# File 'lib/cloudi.rb', line 583 def request_http_qs_parse(request) binary_key_value_parse(request) end |
#return_(command, name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/cloudi.rb', line 217 def return_(command, name, pattern, response_info, response, timeout, trans_id, pid) case command when ASYNC return_async(name, pattern, response_info, response, timeout, trans_id, pid) when SYNC return_sync(name, pattern, response_info, response, timeout, trans_id, pid) end end |
#return_async(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
# File 'lib/cloudi.rb', line 229 def return_async(name, pattern, response_info, response, timeout, trans_id, pid) if @requestTimeoutAdjustment if timeout == @request_timeout elapsed = [0, ((Time.now - @request_timer) * 1000.0).floor].max if elapsed > timeout response_info = '' response = '' timeout = 0 else timeout -= elapsed end end end send(term_to_binary([:return_async, name, pattern, OtpErlangBinary.new(response_info), OtpErlangBinary.new(response), timeout, OtpErlangBinary.new(trans_id), pid])) raise ReturnAsyncException.new() end |
#return_sync(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 |
# File 'lib/cloudi.rb', line 252 def return_sync(name, pattern, response_info, response, timeout, trans_id, pid) if @requestTimeoutAdjustment if timeout == @request_timeout elapsed = [0, ((Time.now - @request_timer) * 1000.0).floor].max if elapsed > timeout response_info = '' response = '' timeout = 0 else timeout -= elapsed end end end send(term_to_binary([:return_sync, name, pattern, OtpErlangBinary.new(response_info), OtpErlangBinary.new(response), timeout, OtpErlangBinary.new(trans_id), pid])) raise ReturnSyncException.new() end |
#send_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/cloudi.rb', line 109 def send_async(name, request, timeout=nil, request_info=nil, priority=nil) if timeout.nil? timeout = @timeoutAsync end if request_info.nil? request_info = '' end if priority.nil? priority = @priorityDefault end send(term_to_binary([:send_async, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority])) return poll_request(false) end |
#send_sync(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/cloudi.rb', line 127 def send_sync(name, request, timeout=nil, request_info=nil, priority=nil) if timeout.nil? timeout = @timeoutSync end if request_info.nil? request_info = '' end if priority.nil? priority = @priorityDefault end send(term_to_binary([:send_sync, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority])) return poll_request(false) end |
#subscribe(pattern, function) ⇒ Object
87 88 89 90 91 92 93 94 95 96 |
# File 'lib/cloudi.rb', line 87 def subscribe(pattern, function) key = @prefix + pattern value = @callbacks.fetch(key, nil) if value.nil? @callbacks[key] = [function] else value.push(function) end send(term_to_binary([:subscribe, pattern])) end |
#timeout_async ⇒ Object
307 308 309 |
# File 'lib/cloudi.rb', line 307 def timeout_async return @timeoutAsync end |
#timeout_sync ⇒ Object
311 312 313 |
# File 'lib/cloudi.rb', line 311 def timeout_sync return @timeoutSync end |
#unsubscribe(pattern) ⇒ Object
98 99 100 101 102 103 104 105 106 107 |
# File 'lib/cloudi.rb', line 98 def unsubscribe(pattern) key = @prefix + pattern value = @callbacks.fetch(key, nil) API.assert{value != nil} value.shift if value.empty? @callbacks.delete(key) end send(term_to_binary([:unsubscribe, pattern])) end |