Class: CloudI::API
Constant Summary collapse
- ASYNC =
unbuffered output is with $stderr.puts ‘…’
1- SYNC =
-1
Instance Attribute Summary collapse
-
#prefix ⇒ Object
readonly
Returns the value of attribute prefix.
-
#process_count ⇒ Object
readonly
Returns the value of attribute process_count.
-
#process_count_max ⇒ Object
readonly
Returns the value of attribute process_count_max.
-
#process_count_min ⇒ Object
readonly
Returns the value of attribute process_count_min.
-
#process_index ⇒ Object
readonly
Returns the value of attribute process_index.
-
#timeout_async ⇒ Object
readonly
Returns the value of attribute timeout_async.
-
#timeout_initialize ⇒ Object
readonly
Returns the value of attribute timeout_initialize.
-
#timeout_sync ⇒ Object
readonly
Returns the value of attribute timeout_sync.
-
#timeout_terminate ⇒ Object
readonly
Returns the value of attribute timeout_terminate.
Class Method Summary collapse
- .assert ⇒ Object
- .info_key_value_new(pairs, response = true) ⇒ Object
- .info_key_value_parse(info) ⇒ Object
- .thread_count ⇒ Object
Instance Method Summary collapse
- #forward_(request_type, name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
- #forward_async(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
- #forward_sync(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
-
#initialize(thread_index) ⇒ API
constructor
A new instance of API.
- #mcast_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
- #poll(timeout = nil) ⇒ Object
- #recv_async(timeout = nil, trans_id = nil, consume = true) ⇒ Object
- #return_(request_type, name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
- #return_async(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
- #return_sync(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
- #send_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
- #send_sync(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
- #shutdown(reason = nil) ⇒ Object
- #subscribe(pattern, function) ⇒ Object
- #subscribe_count(pattern) ⇒ Object
- #unsubscribe(pattern) ⇒ Object
Constructor Details
#initialize(thread_index) ⇒ API
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/cloudi.rb', line 44 def initialize(thread_index) protocol = API.getenv('CLOUDI_API_INIT_PROTOCOL') buffer_size_str = API.getenv('CLOUDI_API_INIT_BUFFER_SIZE') if protocol == 'tcp' @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false) @s.sync = true @use_header = true elsif protocol == 'udp' @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false) @s.sync = true @use_header = false elsif protocol == 'local' @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false) @s.sync = true @use_header = true else raise InvalidInputException end @initialization_complete = false @terminate = false @size = buffer_size_str.to_i @callbacks = Hash.new @timeout_terminate = 10 # TIMEOUT_TERMINATE_MIN send(Erlang.term_to_binary(:init)) poll_request(nil, false) end |
Instance Attribute Details
#prefix ⇒ Object (readonly)
Returns the value of attribute prefix.
74 75 76 |
# File 'lib/cloudi.rb', line 74 def prefix @prefix end |
#process_count ⇒ Object (readonly)
Returns the value of attribute process_count.
71 72 73 |
# File 'lib/cloudi.rb', line 71 def process_count @process_count end |
#process_count_max ⇒ Object (readonly)
Returns the value of attribute process_count_max.
72 73 74 |
# File 'lib/cloudi.rb', line 72 def process_count_max @process_count_max end |
#process_count_min ⇒ Object (readonly)
Returns the value of attribute process_count_min.
73 74 75 |
# File 'lib/cloudi.rb', line 73 def process_count_min @process_count_min end |
#process_index ⇒ Object (readonly)
Returns the value of attribute process_index.
70 71 72 |
# File 'lib/cloudi.rb', line 70 def process_index @process_index end |
#timeout_async ⇒ Object (readonly)
Returns the value of attribute timeout_async.
76 77 78 |
# File 'lib/cloudi.rb', line 76 def timeout_async @timeout_async end |
#timeout_initialize ⇒ Object (readonly)
Returns the value of attribute timeout_initialize.
75 76 77 |
# File 'lib/cloudi.rb', line 75 def timeout_initialize @timeout_initialize end |
#timeout_sync ⇒ Object (readonly)
Returns the value of attribute timeout_sync.
77 78 79 |
# File 'lib/cloudi.rb', line 77 def timeout_sync @timeout_sync end |
#timeout_terminate ⇒ Object (readonly)
Returns the value of attribute timeout_terminate.
78 79 80 |
# File 'lib/cloudi.rb', line 78 def timeout_terminate @timeout_terminate end |
Class Method Details
.assert ⇒ Object
681 682 683 |
# File 'lib/cloudi.rb', line 681 def self.assert raise 'Assertion failed !' unless yield # if $DEBUG end |
.info_key_value_new(pairs, response = true) ⇒ Object
677 678 679 |
# File 'lib/cloudi.rb', line 677 def self.info_key_value_new(pairs, response = true) return text_pairs_new(pairs, response) end |
.info_key_value_parse(info) ⇒ Object
673 674 675 |
# File 'lib/cloudi.rb', line 673 def self.info_key_value_parse(info) return text_pairs_parse(info) end |
.thread_count ⇒ Object
80 81 82 83 |
# File 'lib/cloudi.rb', line 80 def self.thread_count s = getenv('CLOUDI_API_INIT_THREAD_COUNT') s.to_i end |
Instance Method Details
#forward_(request_type, name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/cloudi.rb', line 166 def forward_(request_type, name, request_info, request, timeout, priority, trans_id, pid) case request_type when ASYNC forward_async(name, request_info, request, timeout, priority, trans_id, pid) when SYNC forward_sync(name, request_info, request, timeout, priority, trans_id, pid) end end |
#forward_async(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
178 179 180 181 182 183 184 185 186 |
# File 'lib/cloudi.rb', line 178 def forward_async(name, request_info, request, timeout, priority, trans_id, pid) send(Erlang.term_to_binary([:forward_async, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority, OtpErlangBinary.new(trans_id), pid])) raise ForwardAsyncException.new() end |
#forward_sync(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
188 189 190 191 192 193 194 195 196 |
# File 'lib/cloudi.rb', line 188 def forward_sync(name, request_info, request, timeout, priority, trans_id, pid) send(Erlang.term_to_binary([:forward_sync, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority, OtpErlangBinary.new(trans_id), pid])) raise ForwardSyncException.new() end |
#mcast_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/cloudi.rb', line 148 def mcast_async(name, request, timeout=nil, request_info=nil, priority=nil) if timeout.nil? timeout = @timeout_async end if request_info.nil? request_info = '' end if priority.nil? priority = @priority_default end send(Erlang.term_to_binary([:mcast_async, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority])) return poll_request(nil, false) end |
#poll(timeout = nil) ⇒ Object
624 625 626 627 628 629 |
# File 'lib/cloudi.rb', line 624 def poll(timeout=nil) if timeout.nil? timeout = -1 end return poll_request(timeout, true) end |
#recv_async(timeout = nil, trans_id = nil, consume = true) ⇒ Object
230 231 232 233 234 235 236 237 238 239 240 241 |
# File 'lib/cloudi.rb', line 230 def recv_async(timeout=nil, trans_id=nil, consume=true) if timeout.nil? timeout = @timeout_sync end if trans_id.nil? trans_id = 0.chr * 16 end send(Erlang.term_to_binary([:recv_async, timeout, OtpErlangBinary.new(trans_id), consume])) return poll_request(nil, false) end |
#return_(request_type, name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/cloudi.rb', line 198 def return_(request_type, name, pattern, response_info, response, timeout, trans_id, pid) case request_type when ASYNC return_async(name, pattern, response_info, response, timeout, trans_id, pid) when SYNC return_sync(name, pattern, response_info, response, timeout, trans_id, pid) end end |
#return_async(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
210 211 212 213 214 215 216 217 218 |
# File 'lib/cloudi.rb', line 210 def return_async(name, pattern, response_info, response, timeout, trans_id, pid) send(Erlang.term_to_binary([:return_async, name, pattern, OtpErlangBinary.new(response_info), OtpErlangBinary.new(response), timeout, OtpErlangBinary.new(trans_id), pid])) raise ReturnAsyncException.new() end |
#return_sync(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
220 221 222 223 224 225 226 227 228 |
# File 'lib/cloudi.rb', line 220 def return_sync(name, pattern, response_info, response, timeout, trans_id, pid) send(Erlang.term_to_binary([:return_sync, name, pattern, OtpErlangBinary.new(response_info), OtpErlangBinary.new(response), timeout, OtpErlangBinary.new(trans_id), pid])) raise ReturnSyncException.new() end |
#send_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/cloudi.rb', line 112 def send_async(name, request, timeout=nil, request_info=nil, priority=nil) if timeout.nil? timeout = @timeout_async end if request_info.nil? request_info = '' end if priority.nil? priority = @priority_default end send(Erlang.term_to_binary([:send_async, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority])) return poll_request(nil, false) end |
#send_sync(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/cloudi.rb', line 130 def send_sync(name, request, timeout=nil, request_info=nil, priority=nil) if timeout.nil? timeout = @timeout_sync end if request_info.nil? request_info = '' end if priority.nil? priority = @priority_default end send(Erlang.term_to_binary([:send_sync, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority])) return poll_request(nil, false) end |
#shutdown(reason = nil) ⇒ Object
631 632 633 634 635 636 |
# File 'lib/cloudi.rb', line 631 def shutdown(reason=nil) if reason.nil? reason = '' end send(Erlang.term_to_binary([:shutdown, reason])) end |
#subscribe(pattern, function) ⇒ Object
85 86 87 88 89 90 91 92 93 94 |
# File 'lib/cloudi.rb', line 85 def subscribe(pattern, function) key = @prefix + pattern value = @callbacks.fetch(key, nil) if value.nil? @callbacks[key] = [function] else value.push(function) end send(Erlang.term_to_binary([:subscribe, pattern])) end |
#subscribe_count(pattern) ⇒ Object
96 97 98 99 |
# File 'lib/cloudi.rb', line 96 def subscribe_count(pattern) send(Erlang.term_to_binary([:subscribe_count, pattern])) return poll_request(nil, false) end |
#unsubscribe(pattern) ⇒ Object
101 102 103 104 105 106 107 108 109 110 |
# File 'lib/cloudi.rb', line 101 def unsubscribe(pattern) key = @prefix + pattern value = @callbacks.fetch(key, nil) API.assert{value != nil} value.shift if value.empty? @callbacks.delete(key) end send(Erlang.term_to_binary([:unsubscribe, pattern])) end |