Class: CloudI::API
Constant Summary collapse
- ASYNC =
unbuffered output is with $stderr.puts ‘…’
1- SYNC =
-1
Instance Attribute Summary collapse
-
#prefix ⇒ Object
readonly
Returns the value of attribute prefix.
-
#process_count ⇒ Object
readonly
Returns the value of attribute process_count.
-
#process_count_max ⇒ Object
readonly
Returns the value of attribute process_count_max.
-
#process_count_min ⇒ Object
readonly
Returns the value of attribute process_count_min.
-
#process_index ⇒ Object
readonly
Returns the value of attribute process_index.
-
#timeout_async ⇒ Object
readonly
Returns the value of attribute timeout_async.
-
#timeout_initialize ⇒ Object
readonly
Returns the value of attribute timeout_initialize.
-
#timeout_sync ⇒ Object
readonly
Returns the value of attribute timeout_sync.
-
#timeout_terminate ⇒ Object
readonly
Returns the value of attribute timeout_terminate.
Class Method Summary collapse
- .assert ⇒ Object
- .info_key_value_new(pairs, response = true) ⇒ Object
- .info_key_value_parse(info) ⇒ Object
- .thread_count ⇒ Object
Instance Method Summary collapse
- #forward_(request_type, name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
- #forward_async(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
- #forward_sync(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
-
#initialize(thread_index) ⇒ API
constructor
A new instance of API.
- #mcast_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
- #poll(timeout = nil) ⇒ Object
- #recv_async(timeout = nil, trans_id = nil, consume = true) ⇒ Object
- #return_(request_type, name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
- #return_async(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
- #return_sync(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
- #send_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
- #send_sync(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
- #shutdown(reason = nil) ⇒ Object
- #subscribe(pattern, function) ⇒ Object
- #subscribe_count(pattern) ⇒ Object
- #unsubscribe(pattern) ⇒ Object
Constructor Details
#initialize(thread_index) ⇒ API
Returns a new instance of API.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/cloudi.rb', line 44 def initialize(thread_index) protocol = API.getenv('CLOUDI_API_INIT_PROTOCOL') buffer_size_str = API.getenv('CLOUDI_API_INIT_BUFFER_SIZE') if protocol == 'tcp' @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false) @s.sync = true @use_header = true elsif protocol == 'udp' @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false) @s.sync = true @use_header = false elsif protocol == 'local' @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false) @s.sync = true @use_header = true else $stderr.puts 'CloudI service execution must occur in CloudI' raise InvalidInputException end @initialization_complete = false @terminate = false @size = buffer_size_str.to_i @callbacks = Hash.new @timeout_terminate = 10 # TIMEOUT_TERMINATE_MIN send(Erlang.term_to_binary(:init)) poll_request(nil, false) end |
Instance Attribute Details
#prefix ⇒ Object (readonly)
Returns the value of attribute prefix.
75 76 77 |
# File 'lib/cloudi.rb', line 75 def prefix @prefix end |
#process_count ⇒ Object (readonly)
Returns the value of attribute process_count.
72 73 74 |
# File 'lib/cloudi.rb', line 72 def process_count @process_count end |
#process_count_max ⇒ Object (readonly)
Returns the value of attribute process_count_max.
73 74 75 |
# File 'lib/cloudi.rb', line 73 def process_count_max @process_count_max end |
#process_count_min ⇒ Object (readonly)
Returns the value of attribute process_count_min.
74 75 76 |
# File 'lib/cloudi.rb', line 74 def process_count_min @process_count_min end |
#process_index ⇒ Object (readonly)
Returns the value of attribute process_index.
71 72 73 |
# File 'lib/cloudi.rb', line 71 def process_index @process_index end |
#timeout_async ⇒ Object (readonly)
Returns the value of attribute timeout_async.
77 78 79 |
# File 'lib/cloudi.rb', line 77 def timeout_async @timeout_async end |
#timeout_initialize ⇒ Object (readonly)
Returns the value of attribute timeout_initialize.
76 77 78 |
# File 'lib/cloudi.rb', line 76 def timeout_initialize @timeout_initialize end |
#timeout_sync ⇒ Object (readonly)
Returns the value of attribute timeout_sync.
78 79 80 |
# File 'lib/cloudi.rb', line 78 def timeout_sync @timeout_sync end |
#timeout_terminate ⇒ Object (readonly)
Returns the value of attribute timeout_terminate.
79 80 81 |
# File 'lib/cloudi.rb', line 79 def timeout_terminate @timeout_terminate end |
Class Method Details
.assert ⇒ Object
698 699 700 |
# File 'lib/cloudi.rb', line 698 def self.assert raise AssertionError unless yield # if $DEBUG end |
.info_key_value_new(pairs, response = true) ⇒ Object
694 695 696 |
# File 'lib/cloudi.rb', line 694 def self.info_key_value_new(pairs, response = true) return text_pairs_new(pairs, response) end |
.info_key_value_parse(info) ⇒ Object
690 691 692 |
# File 'lib/cloudi.rb', line 690 def self.info_key_value_parse(info) return text_pairs_parse(info) end |
.thread_count ⇒ Object
81 82 83 84 |
# File 'lib/cloudi.rb', line 81 def self.thread_count s = getenv('CLOUDI_API_INIT_THREAD_COUNT') s.to_i end |
Instance Method Details
#forward_(request_type, name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/cloudi.rb', line 167 def forward_(request_type, name, request_info, request, timeout, priority, trans_id, pid) case request_type when ASYNC forward_async(name, request_info, request, timeout, priority, trans_id, pid) when SYNC forward_sync(name, request_info, request, timeout, priority, trans_id, pid) end end |
#forward_async(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
179 180 181 182 183 184 185 186 187 |
# File 'lib/cloudi.rb', line 179 def forward_async(name, request_info, request, timeout, priority, trans_id, pid) send(Erlang.term_to_binary([:forward_async, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority, OtpErlangBinary.new(trans_id), pid])) raise ForwardAsyncException end |
#forward_sync(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object
189 190 191 192 193 194 195 196 197 |
# File 'lib/cloudi.rb', line 189 def forward_sync(name, request_info, request, timeout, priority, trans_id, pid) send(Erlang.term_to_binary([:forward_sync, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority, OtpErlangBinary.new(trans_id), pid])) raise ForwardSyncException end |
#mcast_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/cloudi.rb', line 149 def mcast_async(name, request, timeout=nil, request_info=nil, priority=nil) if timeout.nil? timeout = @timeout_async end if request_info.nil? request_info = '' end if priority.nil? priority = @priority_default end send(Erlang.term_to_binary([:mcast_async, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority])) return poll_request(nil, false) end |
#poll(timeout = nil) ⇒ Object
641 642 643 644 645 646 |
# File 'lib/cloudi.rb', line 641 def poll(timeout=nil) if timeout.nil? timeout = -1 end return poll_request(timeout, true) end |
#recv_async(timeout = nil, trans_id = nil, consume = true) ⇒ Object
231 232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/cloudi.rb', line 231 def recv_async(timeout=nil, trans_id=nil, consume=true) if timeout.nil? timeout = @timeout_sync end if trans_id.nil? trans_id = 0.chr * 16 end send(Erlang.term_to_binary([:recv_async, timeout, OtpErlangBinary.new(trans_id), consume])) return poll_request(nil, false) end |
#return_(request_type, name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/cloudi.rb', line 199 def return_(request_type, name, pattern, response_info, response, timeout, trans_id, pid) case request_type when ASYNC return_async(name, pattern, response_info, response, timeout, trans_id, pid) when SYNC return_sync(name, pattern, response_info, response, timeout, trans_id, pid) end end |
#return_async(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
211 212 213 214 215 216 217 218 219 |
# File 'lib/cloudi.rb', line 211 def return_async(name, pattern, response_info, response, timeout, trans_id, pid) send(Erlang.term_to_binary([:return_async, name, pattern, OtpErlangBinary.new(response_info), OtpErlangBinary.new(response), timeout, OtpErlangBinary.new(trans_id), pid])) raise ReturnAsyncException end |
#return_sync(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object
221 222 223 224 225 226 227 228 229 |
# File 'lib/cloudi.rb', line 221 def return_sync(name, pattern, response_info, response, timeout, trans_id, pid) send(Erlang.term_to_binary([:return_sync, name, pattern, OtpErlangBinary.new(response_info), OtpErlangBinary.new(response), timeout, OtpErlangBinary.new(trans_id), pid])) raise ReturnSyncException end |
#send_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/cloudi.rb', line 113 def send_async(name, request, timeout=nil, request_info=nil, priority=nil) if timeout.nil? timeout = @timeout_async end if request_info.nil? request_info = '' end if priority.nil? priority = @priority_default end send(Erlang.term_to_binary([:send_async, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority])) return poll_request(nil, false) end |
#send_sync(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/cloudi.rb', line 131 def send_sync(name, request, timeout=nil, request_info=nil, priority=nil) if timeout.nil? timeout = @timeout_sync end if request_info.nil? request_info = '' end if priority.nil? priority = @priority_default end send(Erlang.term_to_binary([:send_sync, name, OtpErlangBinary.new(request_info), OtpErlangBinary.new(request), timeout, priority])) return poll_request(nil, false) end |
#shutdown(reason = nil) ⇒ Object
648 649 650 651 652 653 |
# File 'lib/cloudi.rb', line 648 def shutdown(reason=nil) if reason.nil? reason = '' end send(Erlang.term_to_binary([:shutdown, reason])) end |
#subscribe(pattern, function) ⇒ Object
86 87 88 89 90 91 92 93 94 95 |
# File 'lib/cloudi.rb', line 86 def subscribe(pattern, function) key = @prefix + pattern value = @callbacks.fetch(key, nil) if value.nil? @callbacks[key] = [function] else value.push(function) end send(Erlang.term_to_binary([:subscribe, pattern])) end |
#subscribe_count(pattern) ⇒ Object
97 98 99 100 |
# File 'lib/cloudi.rb', line 97 def subscribe_count(pattern) send(Erlang.term_to_binary([:subscribe_count, pattern])) return poll_request(nil, false) end |
#unsubscribe(pattern) ⇒ Object
102 103 104 105 106 107 108 109 110 111 |
# File 'lib/cloudi.rb', line 102 def unsubscribe(pattern) key = @prefix + pattern value = @callbacks.fetch(key, nil) API.assert{value != nil} value.shift if value.empty? @callbacks.delete(key) end send(Erlang.term_to_binary([:unsubscribe, pattern])) end |