Class: CloudI::API
Constant Summary collapse
- ASYNC =
unbuffered output is with $stderr.puts ‘…’
1- SYNC =
-1
Instance Attribute Summary collapse
-
#prefix ⇒ Object
readonly
Returns the value of attribute prefix.
-
#priority_default ⇒ Object
readonly
Returns the value of attribute priority_default.
-
#process_count ⇒ Object
readonly
Returns the value of attribute process_count.
-
#process_count_max ⇒ Object
readonly
Returns the value of attribute process_count_max.
-
#process_count_min ⇒ Object
readonly
Returns the value of attribute process_count_min.
-
#process_index ⇒ Object
readonly
Returns the value of attribute process_index.
-
#timeout_async ⇒ Object
readonly
Returns the value of attribute timeout_async.
-
#timeout_initialize ⇒ Object
readonly
Returns the value of attribute timeout_initialize.
-
#timeout_sync ⇒ Object
readonly
Returns the value of attribute timeout_sync.
-
#timeout_terminate ⇒ Object
readonly
Returns the value of attribute timeout_terminate.
Class Method Summary collapse
- .assert ⇒ Object
- .info_key_value_new(pairs, response = true) ⇒ Object
- .info_key_value_parse(info) ⇒ Object
- .thread_count ⇒ Object
Instance Method Summary collapse
- #forward_(request_type, name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
- #forward_async(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
- #forward_sync(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
-
#initialize(thread_index) ⇒ API
constructor
A new instance of API.
- #mcast_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
- #poll(timeout = nil) ⇒ Object
- #recv_async(timeout = nil, trans_id = nil, consume = true) ⇒ Object
- #return_(request_type, name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
- #return_async(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
- #return_sync(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
- #send_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
- #send_sync(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
- #shutdown(reason = nil) ⇒ Object
- #subscribe(pattern, function) ⇒ Object
- #subscribe_count(pattern) ⇒ Object
- #unsubscribe(pattern) ⇒ Object
Constructor Details
#initialize(thread_index) ⇒ API
Returns a new instance of API.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/cloudi.rb', line 44 def initialize(thread_index) protocol = API.getenv('CLOUDI_API_INIT_PROTOCOL') buffer_size_str = API.getenv('CLOUDI_API_INIT_BUFFER_SIZE') if protocol == 'tcp' @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false) @s.sync = true @use_header = true elsif protocol == 'udp' @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false) @s.sync = true @use_header = false elsif protocol == 'local' @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false) @s.sync = true @use_header = true else $stderr.puts 'CloudI service execution must occur in CloudI' raise InvalidInputException end @initialization_complete = false @terminate = false @size = buffer_size_str.to_i @callbacks = Hash.new @timeout_terminate = 10 # TIMEOUT_TERMINATE_MIN send(Erlang.term_to_binary(:init)) poll_request(nil, false) end |
Instance Attribute Details
#prefix ⇒ Object (readonly)
Returns the value of attribute prefix.
75 76 77 |
# File 'lib/cloudi.rb', line 75 def prefix @prefix end |
#priority_default ⇒ Object (readonly)
Returns the value of attribute priority_default.
80 81 82 |
# File 'lib/cloudi.rb', line 80 def priority_default @priority_default end |
#process_count ⇒ Object (readonly)
Returns the value of attribute process_count.
72 73 74 |
# File 'lib/cloudi.rb', line 72 def process_count @process_count end |
#process_count_max ⇒ Object (readonly)
Returns the value of attribute process_count_max.
73 74 75 |
# File 'lib/cloudi.rb', line 73 def process_count_max @process_count_max end |
#process_count_min ⇒ Object (readonly)
Returns the value of attribute process_count_min.
74 75 76 |
# File 'lib/cloudi.rb', line 74 def process_count_min @process_count_min end |
#process_index ⇒ Object (readonly)
Returns the value of attribute process_index.
71 72 73 |
# File 'lib/cloudi.rb', line 71 def process_index @process_index end |
#timeout_async ⇒ Object (readonly)
Returns the value of attribute timeout_async.
77 78 79 |
# File 'lib/cloudi.rb', line 77 def timeout_async @timeout_async end |
#timeout_initialize ⇒ Object (readonly)
Returns the value of attribute timeout_initialize.
76 77 78 |
# File 'lib/cloudi.rb', line 76 def timeout_initialize @timeout_initialize end |
#timeout_sync ⇒ Object (readonly)
Returns the value of attribute timeout_sync.
78 79 80 |
# File 'lib/cloudi.rb', line 78 def timeout_sync @timeout_sync end |
#timeout_terminate ⇒ Object (readonly)
Returns the value of attribute timeout_terminate.
79 80 81 |
# File 'lib/cloudi.rb', line 79 def timeout_terminate @timeout_terminate end |
Class Method Details
.assert ⇒ Object
699 700 701 |
# File 'lib/cloudi.rb', line 699 def self.assert raise AssertionError unless yield # if $DEBUG end |
.info_key_value_new(pairs, response = true) ⇒ Object
695 696 697 |
# File 'lib/cloudi.rb', line 695 def self.info_key_value_new(pairs, response = true) return text_pairs_new(pairs, response) end |
.info_key_value_parse(info) ⇒ Object
691 692 693 |
# File 'lib/cloudi.rb', line 691 def self.info_key_value_parse(info) return text_pairs_parse(info) end |
.thread_count ⇒ Object
82 83 84 85 |
# File 'lib/cloudi.rb', line 82 def self.thread_count s = getenv('CLOUDI_API_INIT_THREAD_COUNT') s.to_i end |
Instance Method Details
#forward_(request_type, name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/cloudi.rb', line 168 def forward_(request_type, name, request_info, request, timeout, priority, trans_id, pid) case request_type when ASYNC forward_async(name, request_info, request, timeout, priority, trans_id, pid) when SYNC forward_sync(name, request_info, request, timeout, priority, trans_id, pid) end end |
#forward_async(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
180 181 182 183 184 185 186 187 188 |
# File 'lib/cloudi.rb', line 180 def forward_async(name, request_info, request, timeout, priority, trans_id, pid) send(Erlang.term_to_binary([:forward_async, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority, OtpErlangBinary.new(trans_id), pid])) raise ForwardAsyncException end |
#forward_sync(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
190 191 192 193 194 195 196 197 198 |
# File 'lib/cloudi.rb', line 190 def forward_sync(name, request_info, request, timeout, priority, trans_id, pid) send(Erlang.term_to_binary([:forward_sync, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority, OtpErlangBinary.new(trans_id), pid])) raise ForwardSyncException end |
#mcast_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/cloudi.rb', line 150 def mcast_async(name, request, timeout=nil, request_info=nil, priority=nil) if timeout.nil? timeout = @timeout_async end if request_info.nil? request_info = '' end if priority.nil? priority = @priority_default end send(Erlang.term_to_binary([:mcast_async, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority])) return poll_request(nil, false) end |
#poll(timeout = nil) ⇒ Object
642 643 644 645 646 647 |
# File 'lib/cloudi.rb', line 642 def poll(timeout=nil) if timeout.nil? timeout = -1 end return poll_request(timeout, true) end |
#recv_async(timeout = nil, trans_id = nil, consume = true) ⇒ Object
232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/cloudi.rb', line 232 def recv_async(timeout=nil, trans_id=nil, consume=true) if timeout.nil? timeout = @timeout_sync end if trans_id.nil? trans_id = 0.chr * 16 end send(Erlang.term_to_binary([:recv_async, timeout, OtpErlangBinary.new(trans_id), consume])) return poll_request(nil, false) end |
#return_(request_type, name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/cloudi.rb', line 200 def return_(request_type, name, pattern, response_info, response, timeout, trans_id, pid) case request_type when ASYNC return_async(name, pattern, response_info, response, timeout, trans_id, pid) when SYNC return_sync(name, pattern, response_info, response, timeout, trans_id, pid) end end |
#return_async(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
212 213 214 215 216 217 218 219 220 |
# File 'lib/cloudi.rb', line 212 def return_async(name, pattern, response_info, response, timeout, trans_id, pid) send(Erlang.term_to_binary([:return_async, name, pattern, OtpErlangBinary.new(response_info), OtpErlangBinary.new(response), timeout, OtpErlangBinary.new(trans_id), pid])) raise ReturnAsyncException end |
#return_sync(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
222 223 224 225 226 227 228 229 230 |
# File 'lib/cloudi.rb', line 222 def return_sync(name, pattern, response_info, response, timeout, trans_id, pid) send(Erlang.term_to_binary([:return_sync, name, pattern, OtpErlangBinary.new(response_info), OtpErlangBinary.new(response), timeout, OtpErlangBinary.new(trans_id), pid])) raise ReturnSyncException end |
#send_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/cloudi.rb', line 114 def send_async(name, request, timeout=nil, request_info=nil, priority=nil) if timeout.nil? timeout = @timeout_async end if request_info.nil? request_info = '' end if priority.nil? priority = @priority_default end send(Erlang.term_to_binary([:send_async, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority])) return poll_request(nil, false) end |
#send_sync(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/cloudi.rb', line 132 def send_sync(name, request, timeout=nil, request_info=nil, priority=nil) if timeout.nil? timeout = @timeout_sync end if request_info.nil? request_info = '' end if priority.nil? priority = @priority_default end send(Erlang.term_to_binary([:send_sync, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority])) return poll_request(nil, false) end |
#shutdown(reason = nil) ⇒ Object
649 650 651 652 653 654 |
# File 'lib/cloudi.rb', line 649 def shutdown(reason=nil) if reason.nil? reason = '' end send(Erlang.term_to_binary([:shutdown, reason])) end |
#subscribe(pattern, function) ⇒ Object
87 88 89 90 91 92 93 94 95 96 |
# File 'lib/cloudi.rb', line 87 def subscribe(pattern, function) key = @prefix + pattern value = @callbacks.fetch(key, nil) if value.nil? @callbacks[key] = [function] else value.push(function) end send(Erlang.term_to_binary([:subscribe, pattern])) end |
#subscribe_count(pattern) ⇒ Object
98 99 100 101 |
# File 'lib/cloudi.rb', line 98 def subscribe_count(pattern) send(Erlang.term_to_binary([:subscribe_count, pattern])) return poll_request(nil, false) end |
#unsubscribe(pattern) ⇒ Object
103 104 105 106 107 108 109 110 111 112 |
# File 'lib/cloudi.rb', line 103 def unsubscribe(pattern) key = @prefix + pattern value = @callbacks.fetch(key, nil) API.assert{value != nil} value.shift if value.empty? @callbacks.delete(key) end send(Erlang.term_to_binary([:unsubscribe, pattern])) end |