Class: CloudI::API

Inherits:
Object
  • Object
show all
Includes:
Erlang
Defined in:
lib/cloudi.rb

Constant Summary collapse

ASYNC =

unbuffered output is with $stderr.puts ‘…’

1
SYNC =
-1

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(thread_index) ⇒ API

Returns a new instance of API.



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/cloudi.rb', line 57

def initialize(thread_index)
    protocol = API.getenv('CLOUDI_API_INIT_PROTOCOL')
    buffer_size_str = API.getenv('CLOUDI_API_INIT_BUFFER_SIZE')
    if protocol == 'tcp'
        @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false)
        @s.sync = true
        @use_header = true
    elsif protocol == 'udp'
        @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false)
        @s.sync = true
        @use_header = false
    elsif protocol == 'local'
        @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false)
        @s.sync = true
        @use_header = true
    else
        raise InvalidInputException
    end
    @initialization_complete = false
    @size = buffer_size_str.to_i
    @callbacks = Hash.new
    send(term_to_binary(:init))
    poll_request(false)
end

Class Method Details

.assertObject



591
592
593
# File 'lib/cloudi.rb', line 591

def self.assert
    raise 'Assertion failed !' unless yield # if $DEBUG
end

.thread_countObject



82
83
84
85
# File 'lib/cloudi.rb', line 82

def self.thread_count
    s = getenv('CLOUDI_API_INIT_THREAD_COUNT')
    s.to_i
end

Instance Method Details

#forward_(command, name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
# File 'lib/cloudi.rb', line 163

def forward_(command, name, request_info, request,
             timeout, priority, trans_id, pid)
    case command
    when ASYNC
        forward_async(name, request_info, request,
                      timeout, priority, trans_id, pid)
    when SYNC
        forward_sync(name, request_info, request,
                     timeout, priority, trans_id, pid)
    end
end

#forward_async(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/cloudi.rb', line 175

def forward_async(name, request_info, request,
                  timeout, priority, trans_id, pid)
    if @requestTimeoutAdjustment
        if timeout == @request_timeout
            elapsed = [0,
                       ((Time.now - @request_timer) * 1000.0).floor].max
            if elapsed > timeout
                timeout = 0
            else
                timeout -= elapsed
            end
        end
    end
    send(term_to_binary([:forward_async, name,
                         OtpErlangBinary.new(request_info),
                         OtpErlangBinary.new(request),
                         timeout, priority,
                         OtpErlangBinary.new(trans_id), pid]))
    raise ForwardAsyncException.new()
end

#forward_sync(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/cloudi.rb', line 196

def forward_sync(name, request_info, request,
                 timeout, priority, trans_id, pid)
    if @requestTimeoutAdjustment
        if timeout == @request_timeout
            elapsed = [0,
                       ((Time.now - @request_timer) * 1000.0).floor].max
            if elapsed > timeout
                timeout = 0
            else
                timeout -= elapsed
            end
        end
    end
    send(term_to_binary([:forward_sync, name, 
                         OtpErlangBinary.new(request_info),
                         OtpErlangBinary.new(request),
                         timeout, priority,
                         OtpErlangBinary.new(trans_id), pid]))
    raise ForwardSyncException.new()
end

#info_key_value_parse(message_info) ⇒ Object



587
588
589
# File 'lib/cloudi.rb', line 587

def info_key_value_parse(message_info)
    binary_key_value_parse(message_info)
end

#mcast_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/cloudi.rb', line 145

def mcast_async(name, request,
                timeout=nil, request_info=nil, priority=nil)
    if timeout.nil?
        timeout = @timeoutAsync
    end
    if request_info.nil?
        request_info = ''
    end
    if priority.nil?
        priority = @priorityDefault
    end
    send(term_to_binary([:mcast_async, name,
                          OtpErlangBinary.new(request_info),
                          OtpErlangBinary.new(request),
                          timeout, priority]))
    return poll_request(false)
end

#pollObject



563
564
565
# File 'lib/cloudi.rb', line 563

def poll
    poll_request(true)
end

#prefixObject



303
304
305
# File 'lib/cloudi.rb', line 303

def prefix
    return @prefix
end

#process_countObject



291
292
293
# File 'lib/cloudi.rb', line 291

def process_count
    return @processCount
end

#process_count_maxObject



295
296
297
# File 'lib/cloudi.rb', line 295

def process_count_max
    return @processCountMax
end

#process_count_minObject



299
300
301
# File 'lib/cloudi.rb', line 299

def process_count_min
    return @processCountMin
end

#process_indexObject



287
288
289
# File 'lib/cloudi.rb', line 287

def process_index
    return @processIndex
end

#recv_async(timeout = nil, trans_id = nil, consume = true) ⇒ Object



275
276
277
278
279
280
281
282
283
284
285
# File 'lib/cloudi.rb', line 275

def recv_async(timeout=nil, trans_id=nil, consume=true)
    if timeout.nil?
        timeout = @timeoutSync
    end
    if trans_id.nil?
        trans_id = 0.chr * 16
    end
    send(term_to_binary([:recv_async, timeout,
                          OtpErlangBinary.new(trans_id), consume]))
    return poll_request(false)
end

#request_http_qs_parse(request) ⇒ Object



583
584
585
# File 'lib/cloudi.rb', line 583

def request_http_qs_parse(request)
    binary_key_value_parse(request)
end

#return_(command, name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object



217
218
219
220
221
222
223
224
225
226
227
# File 'lib/cloudi.rb', line 217

def return_(command, name, pattern, response_info, response,
            timeout, trans_id, pid)
    case command
    when ASYNC
        return_async(name, pattern, response_info, response,
                     timeout, trans_id, pid)
    when SYNC
        return_sync(name, pattern, response_info, response,
                    timeout, trans_id, pid)
    end
end

#return_async(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object



229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/cloudi.rb', line 229

def return_async(name, pattern, response_info, response,
                 timeout, trans_id, pid)
    if @requestTimeoutAdjustment
        if timeout == @request_timeout
            elapsed = [0,
                       ((Time.now - @request_timer) * 1000.0).floor].max
            if elapsed > timeout
                response_info = ''
                response = ''
                timeout = 0
            else
                timeout -= elapsed
            end
        end
    end
    send(term_to_binary([:return_async, name, pattern,
                         OtpErlangBinary.new(response_info),
                         OtpErlangBinary.new(response),
                         timeout,
                         OtpErlangBinary.new(trans_id), pid]))
    raise ReturnAsyncException.new()
end

#return_sync(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object



252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/cloudi.rb', line 252

def return_sync(name, pattern, response_info, response,
                timeout, trans_id, pid)
    if @requestTimeoutAdjustment
        if timeout == @request_timeout
            elapsed = [0,
                       ((Time.now - @request_timer) * 1000.0).floor].max
            if elapsed > timeout
                response_info = ''
                response = ''
                timeout = 0
            else
                timeout -= elapsed
            end
        end
    end
    send(term_to_binary([:return_sync, name, pattern,
                         OtpErlangBinary.new(response_info),
                         OtpErlangBinary.new(response),
                         timeout,
                         OtpErlangBinary.new(trans_id), pid]))
    raise ReturnSyncException.new()
end

#send_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/cloudi.rb', line 109

def send_async(name, request,
               timeout=nil, request_info=nil, priority=nil)
    if timeout.nil?
        timeout = @timeoutAsync
    end
    if request_info.nil?
        request_info = ''
    end
    if priority.nil?
        priority = @priorityDefault
    end
    send(term_to_binary([:send_async, name,
                          OtpErlangBinary.new(request_info),
                          OtpErlangBinary.new(request),
                          timeout, priority]))
    return poll_request(false)
end

#send_sync(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/cloudi.rb', line 127

def send_sync(name, request,
              timeout=nil, request_info=nil, priority=nil)
    if timeout.nil?
        timeout = @timeoutSync
    end
    if request_info.nil?
        request_info = ''
    end
    if priority.nil?
        priority = @priorityDefault
    end
    send(term_to_binary([:send_sync, name,
                          OtpErlangBinary.new(request_info),
                          OtpErlangBinary.new(request),
                          timeout, priority]))
    return poll_request(false)
end

#subscribe(pattern, function) ⇒ Object



87
88
89
90
91
92
93
94
95
96
# File 'lib/cloudi.rb', line 87

def subscribe(pattern, function)
    key = @prefix + pattern
    value = @callbacks.fetch(key, nil)
    if value.nil?
        @callbacks[key] = [function]
    else
        value.push(function)
    end
    send(term_to_binary([:subscribe, pattern]))
end

#timeout_asyncObject



307
308
309
# File 'lib/cloudi.rb', line 307

def timeout_async
    return @timeoutAsync
end

#timeout_syncObject



311
312
313
# File 'lib/cloudi.rb', line 311

def timeout_sync
    return @timeoutSync
end

#unsubscribe(pattern) ⇒ Object



98
99
100
101
102
103
104
105
106
107
# File 'lib/cloudi.rb', line 98

def unsubscribe(pattern)
    key = @prefix + pattern
    value = @callbacks.fetch(key, nil)
    API.assert{value != nil}
    value.shift
    if value.empty?
        @callbacks.delete(key)
    end
    send(term_to_binary([:unsubscribe, pattern]))
end