Class: CloudI::API
Constant Summary collapse
- ASYNC =
unbuffered output is with $stderr.puts ‘…’
1
- SYNC =
-1
Instance Attribute Summary collapse
-
#prefix ⇒ Object
readonly
Returns the value of attribute prefix.
-
#priority_default ⇒ Object
readonly
Returns the value of attribute priority_default.
-
#process_count ⇒ Object
readonly
Returns the value of attribute process_count.
-
#process_count_max ⇒ Object
readonly
Returns the value of attribute process_count_max.
-
#process_count_min ⇒ Object
readonly
Returns the value of attribute process_count_min.
-
#process_index ⇒ Object
readonly
Returns the value of attribute process_index.
-
#timeout_async ⇒ Object
readonly
Returns the value of attribute timeout_async.
-
#timeout_initialize ⇒ Object
readonly
Returns the value of attribute timeout_initialize.
-
#timeout_sync ⇒ Object
readonly
Returns the value of attribute timeout_sync.
-
#timeout_terminate ⇒ Object
readonly
Returns the value of attribute timeout_terminate.
Class Method Summary collapse
- .assert ⇒ Object
- .info_key_value_new(pairs, response = true) ⇒ Object
- .info_key_value_parse(info) ⇒ Object
- .process_count_max_ ⇒ Object
- .process_count_min_ ⇒ Object
- .process_index_ ⇒ Object
- .thread_count ⇒ Object
- .timeout_initialize_ ⇒ Object
- .timeout_terminate_ ⇒ Object
Instance Method Summary collapse
- #forward_(request_type, name, request_info, request, timeout, priority, trans_id, source) ⇒ Object
- #forward_async(name, request_info, request, timeout, priority, trans_id, source) ⇒ Object
- #forward_sync(name, request_info, request, timeout, priority, trans_id, source) ⇒ Object
-
#initialize(thread_index) ⇒ API
constructor
A new instance of API.
- #mcast_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
- #poll(timeout = nil) ⇒ Object
- #recv_async(timeout = nil, trans_id = nil, consume = true) ⇒ Object
- #return_(request_type, name, pattern, response_info, response, timeout, trans_id, source) ⇒ Object
- #return_async(name, pattern, response_info, response, timeout, trans_id, source) ⇒ Object
- #return_sync(name, pattern, response_info, response, timeout, trans_id, source) ⇒ Object
- #send_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
- #send_sync(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
- #shutdown(reason = nil) ⇒ Object
- #subscribe(pattern, function) ⇒ Object
- #subscribe_count(pattern) ⇒ Object
- #unsubscribe(pattern) ⇒ Object
Constructor Details
#initialize(thread_index) ⇒ API
Returns a new instance of API.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/cloudi.rb', line 44 def initialize(thread_index) protocol = API.getenv('CLOUDI_API_INIT_PROTOCOL') buffer_size = API.getenv_to_uint('CLOUDI_API_INIT_BUFFER_SIZE') if protocol == 'tcp' @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false) @s.sync = true @use_header = true elsif protocol == 'udp' @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false) @s.sync = true @use_header = false elsif protocol == 'local' @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false) @s.sync = true @use_header = true else $stderr.puts 'CloudI service execution must occur in CloudI' raise InvalidInputException end @initialization_complete = false @terminate = false @size = buffer_size @callbacks = Hash.new @timeout_terminate = 10 # TIMEOUT_TERMINATE_MIN send(Erlang.term_to_binary(:init)) poll_request(nil, false) end |
Instance Attribute Details
#prefix ⇒ Object (readonly)
Returns the value of attribute prefix.
75 76 77 |
# File 'lib/cloudi.rb', line 75 def prefix @prefix end |
#priority_default ⇒ Object (readonly)
Returns the value of attribute priority_default.
80 81 82 |
# File 'lib/cloudi.rb', line 80 def priority_default @priority_default end |
#process_count ⇒ Object (readonly)
Returns the value of attribute process_count.
72 73 74 |
# File 'lib/cloudi.rb', line 72 def process_count @process_count end |
#process_count_max ⇒ Object (readonly)
Returns the value of attribute process_count_max.
73 74 75 |
# File 'lib/cloudi.rb', line 73 def process_count_max @process_count_max end |
#process_count_min ⇒ Object (readonly)
Returns the value of attribute process_count_min.
74 75 76 |
# File 'lib/cloudi.rb', line 74 def process_count_min @process_count_min end |
#process_index ⇒ Object (readonly)
Returns the value of attribute process_index.
71 72 73 |
# File 'lib/cloudi.rb', line 71 def process_index @process_index end |
#timeout_async ⇒ Object (readonly)
Returns the value of attribute timeout_async.
77 78 79 |
# File 'lib/cloudi.rb', line 77 def timeout_async @timeout_async end |
#timeout_initialize ⇒ Object (readonly)
Returns the value of attribute timeout_initialize.
76 77 78 |
# File 'lib/cloudi.rb', line 76 def timeout_initialize @timeout_initialize end |
#timeout_sync ⇒ Object (readonly)
Returns the value of attribute timeout_sync.
78 79 80 |
# File 'lib/cloudi.rb', line 78 def timeout_sync @timeout_sync end |
#timeout_terminate ⇒ Object (readonly)
Returns the value of attribute timeout_terminate.
79 80 81 |
# File 'lib/cloudi.rb', line 79 def timeout_terminate @timeout_terminate end |
Class Method Details
.assert ⇒ Object
737 738 739 |
# File 'lib/cloudi.rb', line 737 def self.assert raise AssertionError unless yield # if $DEBUG end |
.info_key_value_new(pairs, response = true) ⇒ Object
733 734 735 |
# File 'lib/cloudi.rb', line 733 def self.info_key_value_new(pairs, response = true) return text_pairs_new(pairs, response) end |
.info_key_value_parse(info) ⇒ Object
729 730 731 |
# File 'lib/cloudi.rb', line 729 def self.info_key_value_parse(info) return text_pairs_parse(info) end |
.process_count_max_ ⇒ Object
252 253 254 |
# File 'lib/cloudi.rb', line 252 def self.process_count_max_ return API.getenv_to_uint('CLOUDI_API_INIT_PROCESS_COUNT_MAX') end |
.process_count_min_ ⇒ Object
256 257 258 |
# File 'lib/cloudi.rb', line 256 def self.process_count_min_ return API.getenv_to_uint('CLOUDI_API_INIT_PROCESS_COUNT_MIN') end |
.process_index_ ⇒ Object
248 249 250 |
# File 'lib/cloudi.rb', line 248 def self.process_index_ return API.getenv_to_uint('CLOUDI_API_INIT_PROCESS_INDEX') end |
.thread_count ⇒ Object
82 83 84 |
# File 'lib/cloudi.rb', line 82 def self.thread_count return API.getenv_to_uint('CLOUDI_API_INIT_THREAD_COUNT') end |
.timeout_initialize_ ⇒ Object
260 261 262 |
# File 'lib/cloudi.rb', line 260 def self.timeout_initialize_ return API.getenv_to_uint('CLOUDI_API_INIT_TIMEOUT_INITIALIZE') end |
.timeout_terminate_ ⇒ Object
264 265 266 |
# File 'lib/cloudi.rb', line 264 def self.timeout_terminate_ return API.getenv_to_uint('CLOUDI_API_INIT_TIMEOUT_TERMINATE') end |
Instance Method Details
#forward_(request_type, name, request_info, request, timeout, priority, trans_id, source) ⇒ Object
167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/cloudi.rb', line 167 def forward_(request_type, name, request_info, request, timeout, priority, trans_id, source) case request_type when ASYNC forward_async(name, request_info, request, timeout, priority, trans_id, source) when SYNC forward_sync(name, request_info, request, timeout, priority, trans_id, source) end end |
#forward_async(name, request_info, request, timeout, priority, trans_id, source) ⇒ Object
179 180 181 182 183 184 185 186 187 188 |
# File 'lib/cloudi.rb', line 179 def forward_async(name, request_info, request, timeout, priority, trans_id, source) send(Erlang.term_to_binary([:forward_async, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority, OtpErlangBinary.new(trans_id), source])) raise ForwardAsyncException end |
#forward_sync(name, request_info, request, timeout, priority, trans_id, source) ⇒ Object
190 191 192 193 194 195 196 197 198 199 |
# File 'lib/cloudi.rb', line 190 def forward_sync(name, request_info, request, timeout, priority, trans_id, source) send(Erlang.term_to_binary([:forward_sync, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority, OtpErlangBinary.new(trans_id), source])) raise ForwardSyncException end |
#mcast_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/cloudi.rb', line 149 def mcast_async(name, request, timeout=nil, request_info=nil, priority=nil) if timeout.nil? timeout = @timeout_async end if request_info.nil? request_info = '' end if priority.nil? priority = @priority_default end send(Erlang.term_to_binary([:mcast_async, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority])) return poll_request(nil, false) end |
#poll(timeout = nil) ⇒ Object
680 681 682 683 684 685 |
# File 'lib/cloudi.rb', line 680 def poll(timeout=nil) if timeout.nil? timeout = -1 end return poll_request(timeout, true) end |
#recv_async(timeout = nil, trans_id = nil, consume = true) ⇒ Object
235 236 237 238 239 240 241 242 243 244 245 246 |
# File 'lib/cloudi.rb', line 235 def recv_async(timeout=nil, trans_id=nil, consume=true) if timeout.nil? timeout = @timeout_sync end if trans_id.nil? trans_id = 0.chr * 16 end send(Erlang.term_to_binary([:recv_async, timeout, OtpErlangBinary.new(trans_id), consume])) return poll_request(nil, false) end |
#return_(request_type, name, pattern, response_info, response, timeout, trans_id, source) ⇒ Object
201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/cloudi.rb', line 201 def return_(request_type, name, pattern, response_info, response, timeout, trans_id, source) case request_type when ASYNC return_async(name, pattern, response_info, response, timeout, trans_id, source) when SYNC return_sync(name, pattern, response_info, response, timeout, trans_id, source) end end |
#return_async(name, pattern, response_info, response, timeout, trans_id, source) ⇒ Object
213 214 215 216 217 218 219 220 221 222 |
# File 'lib/cloudi.rb', line 213 def return_async(name, pattern, response_info, response, timeout, trans_id, source) send(Erlang.term_to_binary([:return_async, name, pattern, OtpErlangBinary.new(response_info), OtpErlangBinary.new(response), timeout, OtpErlangBinary.new(trans_id), source])) raise ReturnAsyncException end |
#return_sync(name, pattern, response_info, response, timeout, trans_id, source) ⇒ Object
224 225 226 227 228 229 230 231 232 233 |
# File 'lib/cloudi.rb', line 224 def return_sync(name, pattern, response_info, response, timeout, trans_id, source) send(Erlang.term_to_binary([:return_sync, name, pattern, OtpErlangBinary.new(response_info), OtpErlangBinary.new(response), timeout, OtpErlangBinary.new(trans_id), source])) raise ReturnSyncException end |
#send_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/cloudi.rb', line 113 def send_async(name, request, timeout=nil, request_info=nil, priority=nil) if timeout.nil? timeout = @timeout_async end if request_info.nil? request_info = '' end if priority.nil? priority = @priority_default end send(Erlang.term_to_binary([:send_async, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority])) return poll_request(nil, false) end |
#send_sync(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/cloudi.rb', line 131 def send_sync(name, request, timeout=nil, request_info=nil, priority=nil) if timeout.nil? timeout = @timeout_sync end if request_info.nil? request_info = '' end if priority.nil? priority = @priority_default end send(Erlang.term_to_binary([:send_sync, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority])) return poll_request(nil, false) end |
#shutdown(reason = nil) ⇒ Object
687 688 689 690 691 692 |
# File 'lib/cloudi.rb', line 687 def shutdown(reason=nil) if reason.nil? reason = '' end send(Erlang.term_to_binary([:shutdown, reason])) end |
#subscribe(pattern, function) ⇒ Object
86 87 88 89 90 91 92 93 94 95 |
# File 'lib/cloudi.rb', line 86 def subscribe(pattern, function) key = @prefix + pattern value = @callbacks.fetch(key, nil) if value.nil? @callbacks[key] = [function] else value.push(function) end send(Erlang.term_to_binary([:subscribe, pattern])) end |
#subscribe_count(pattern) ⇒ Object
97 98 99 100 |
# File 'lib/cloudi.rb', line 97 def subscribe_count(pattern) send(Erlang.term_to_binary([:subscribe_count, pattern])) return poll_request(nil, false) end |
#unsubscribe(pattern) ⇒ Object
102 103 104 105 106 107 108 109 110 111 |
# File 'lib/cloudi.rb', line 102 def unsubscribe(pattern) key = @prefix + pattern value = @callbacks.fetch(key, nil) API.assert{value != nil} value.shift if value.empty? @callbacks.delete(key) end send(Erlang.term_to_binary([:unsubscribe, pattern])) end |