Class: CloudI::API
Constant Summary collapse
- ASYNC =
unbuffered output is with $stderr.puts ‘…’
1- SYNC =
-1
Instance Attribute Summary collapse
-
#prefix ⇒ Object
readonly
Returns the value of attribute prefix.
-
#process_count ⇒ Object
readonly
Returns the value of attribute process_count.
-
#process_count_max ⇒ Object
readonly
Returns the value of attribute process_count_max.
-
#process_count_min ⇒ Object
readonly
Returns the value of attribute process_count_min.
-
#process_index ⇒ Object
readonly
Returns the value of attribute process_index.
-
#timeout_async ⇒ Object
readonly
Returns the value of attribute timeout_async.
-
#timeout_initialize ⇒ Object
readonly
Returns the value of attribute timeout_initialize.
-
#timeout_sync ⇒ Object
readonly
Returns the value of attribute timeout_sync.
-
#timeout_terminate ⇒ Object
readonly
Returns the value of attribute timeout_terminate.
Class Method Summary collapse
Instance Method Summary collapse
- #forward_(request_type, name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
- #forward_async(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
- #forward_sync(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
- #info_key_value_parse(message_info) ⇒ Object
-
#initialize(thread_index) ⇒ API
constructor
A new instance of API.
- #mcast_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
- #poll(timeout = nil) ⇒ Object
- #recv_async(timeout = nil, trans_id = nil, consume = true) ⇒ Object
- #return_(request_type, name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
- #return_async(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
- #return_sync(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
- #send_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
- #send_sync(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
- #subscribe(pattern, function) ⇒ Object
- #subscribe_count(pattern) ⇒ Object
- #unsubscribe(pattern) ⇒ Object
Constructor Details
#initialize(thread_index) ⇒ API
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/cloudi.rb', line 43 def initialize(thread_index) protocol = API.getenv('CLOUDI_API_INIT_PROTOCOL') buffer_size_str = API.getenv('CLOUDI_API_INIT_BUFFER_SIZE') if protocol == 'tcp' @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false) @s.sync = true @use_header = true elsif protocol == 'udp' @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false) @s.sync = true @use_header = false elsif protocol == 'local' @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false) @s.sync = true @use_header = true else raise InvalidInputException end @initialization_complete = false @terminate = false @size = buffer_size_str.to_i @callbacks = Hash.new @timeout_terminate = 1000 # TIMEOUT_TERMINATE_MIN send(Erlang.term_to_binary(:init)) poll_request(nil, false) end |
Instance Attribute Details
#prefix ⇒ Object (readonly)
Returns the value of attribute prefix.
73 74 75 |
# File 'lib/cloudi.rb', line 73 def prefix @prefix end |
#process_count ⇒ Object (readonly)
Returns the value of attribute process_count.
70 71 72 |
# File 'lib/cloudi.rb', line 70 def process_count @process_count end |
#process_count_max ⇒ Object (readonly)
Returns the value of attribute process_count_max.
71 72 73 |
# File 'lib/cloudi.rb', line 71 def process_count_max @process_count_max end |
#process_count_min ⇒ Object (readonly)
Returns the value of attribute process_count_min.
72 73 74 |
# File 'lib/cloudi.rb', line 72 def process_count_min @process_count_min end |
#process_index ⇒ Object (readonly)
Returns the value of attribute process_index.
69 70 71 |
# File 'lib/cloudi.rb', line 69 def process_index @process_index end |
#timeout_async ⇒ Object (readonly)
Returns the value of attribute timeout_async.
75 76 77 |
# File 'lib/cloudi.rb', line 75 def timeout_async @timeout_async end |
#timeout_initialize ⇒ Object (readonly)
Returns the value of attribute timeout_initialize.
74 75 76 |
# File 'lib/cloudi.rb', line 74 def timeout_initialize @timeout_initialize end |
#timeout_sync ⇒ Object (readonly)
Returns the value of attribute timeout_sync.
76 77 78 |
# File 'lib/cloudi.rb', line 76 def timeout_sync @timeout_sync end |
#timeout_terminate ⇒ Object (readonly)
Returns the value of attribute timeout_terminate.
77 78 79 |
# File 'lib/cloudi.rb', line 77 def timeout_terminate @timeout_terminate end |
Class Method Details
.assert ⇒ Object
638 639 640 |
# File 'lib/cloudi.rb', line 638 def self.assert raise 'Assertion failed !' unless yield # if $DEBUG end |
.getenv(key) ⇒ Object
703 704 705 |
# File 'lib/cloudi.rb', line 703 def self.getenv(key) ENV[key] or raise InvalidInputException end |
.thread_count ⇒ Object
79 80 81 82 |
# File 'lib/cloudi.rb', line 79 def self.thread_count s = getenv('CLOUDI_API_INIT_THREAD_COUNT') s.to_i end |
Instance Method Details
#forward_(request_type, name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/cloudi.rb', line 165 def forward_(request_type, name, request_info, request, timeout, priority, trans_id, pid) case request_type when ASYNC forward_async(name, request_info, request, timeout, priority, trans_id, pid) when SYNC forward_sync(name, request_info, request, timeout, priority, trans_id, pid) end end |
#forward_async(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
177 178 179 180 181 182 183 184 185 |
# File 'lib/cloudi.rb', line 177 def forward_async(name, request_info, request, timeout, priority, trans_id, pid) send(Erlang.term_to_binary([:forward_async, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority, OtpErlangBinary.new(trans_id), pid])) raise ForwardAsyncException.new() end |
#forward_sync(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
187 188 189 190 191 192 193 194 195 |
# File 'lib/cloudi.rb', line 187 def forward_sync(name, request_info, request, timeout, priority, trans_id, pid) send(Erlang.term_to_binary([:forward_sync, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority, OtpErlangBinary.new(trans_id), pid])) raise ForwardSyncException.new() end |
#info_key_value_parse(message_info) ⇒ Object
634 635 636 |
# File 'lib/cloudi.rb', line 634 def info_key_value_parse() return text_key_value_parse() end |
#mcast_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/cloudi.rb', line 147 def mcast_async(name, request, timeout=nil, request_info=nil, priority=nil) if timeout.nil? timeout = @timeout_async end if request_info.nil? request_info = '' end if priority.nil? priority = @priority_default end send(Erlang.term_to_binary([:mcast_async, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority])) return poll_request(nil, false) end |
#poll(timeout = nil) ⇒ Object
611 612 613 614 615 616 |
# File 'lib/cloudi.rb', line 611 def poll(timeout=nil) if timeout.nil? timeout = -1 end return poll_request(timeout, true) end |
#recv_async(timeout = nil, trans_id = nil, consume = true) ⇒ Object
229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/cloudi.rb', line 229 def recv_async(timeout=nil, trans_id=nil, consume=true) if timeout.nil? timeout = @timeout_sync end if trans_id.nil? trans_id = 0.chr * 16 end send(Erlang.term_to_binary([:recv_async, timeout, OtpErlangBinary.new(trans_id), consume])) return poll_request(nil, false) end |
#return_(request_type, name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/cloudi.rb', line 197 def return_(request_type, name, pattern, response_info, response, timeout, trans_id, pid) case request_type when ASYNC return_async(name, pattern, response_info, response, timeout, trans_id, pid) when SYNC return_sync(name, pattern, response_info, response, timeout, trans_id, pid) end end |
#return_async(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
209 210 211 212 213 214 215 216 217 |
# File 'lib/cloudi.rb', line 209 def return_async(name, pattern, response_info, response, timeout, trans_id, pid) send(Erlang.term_to_binary([:return_async, name, pattern, OtpErlangBinary.new(response_info), OtpErlangBinary.new(response), timeout, OtpErlangBinary.new(trans_id), pid])) raise ReturnAsyncException.new() end |
#return_sync(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
219 220 221 222 223 224 225 226 227 |
# File 'lib/cloudi.rb', line 219 def return_sync(name, pattern, response_info, response, timeout, trans_id, pid) send(Erlang.term_to_binary([:return_sync, name, pattern, OtpErlangBinary.new(response_info), OtpErlangBinary.new(response), timeout, OtpErlangBinary.new(trans_id), pid])) raise ReturnSyncException.new() end |
#send_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/cloudi.rb', line 111 def send_async(name, request, timeout=nil, request_info=nil, priority=nil) if timeout.nil? timeout = @timeout_async end if request_info.nil? request_info = '' end if priority.nil? priority = @priority_default end send(Erlang.term_to_binary([:send_async, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority])) return poll_request(nil, false) end |
#send_sync(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/cloudi.rb', line 129 def send_sync(name, request, timeout=nil, request_info=nil, priority=nil) if timeout.nil? timeout = @timeout_sync end if request_info.nil? request_info = '' end if priority.nil? priority = @priority_default end send(Erlang.term_to_binary([:send_sync, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority])) return poll_request(nil, false) end |
#subscribe(pattern, function) ⇒ Object
84 85 86 87 88 89 90 91 92 93 |
# File 'lib/cloudi.rb', line 84 def subscribe(pattern, function) key = @prefix + pattern value = @callbacks.fetch(key, nil) if value.nil? @callbacks[key] = [function] else value.push(function) end send(Erlang.term_to_binary([:subscribe, pattern])) end |
#subscribe_count(pattern) ⇒ Object
95 96 97 98 |
# File 'lib/cloudi.rb', line 95 def subscribe_count(pattern) send(Erlang.term_to_binary([:subscribe_count, pattern])) return poll_request(nil, false) end |
#unsubscribe(pattern) ⇒ Object
100 101 102 103 104 105 106 107 108 109 |
# File 'lib/cloudi.rb', line 100 def unsubscribe(pattern) key = @prefix + pattern value = @callbacks.fetch(key, nil) API.assert{value != nil} value.shift if value.empty? @callbacks.delete(key) end send(Erlang.term_to_binary([:unsubscribe, pattern])) end |