Class: CloudI::API

Inherits:
Object
  • Object
show all
Includes:
Erlang
Defined in:
lib/cloudi.rb

Constant Summary collapse

ASYNC =

unbuffered output is with $stderr.puts ‘…’

1
SYNC =
-1

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(thread_index) ⇒ API

Returns a new instance of API.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/cloudi.rb', line 44

def initialize(thread_index)
    protocol = API.getenv('CLOUDI_API_INIT_PROTOCOL')
    buffer_size = API.getenv_to_uint('CLOUDI_API_INIT_BUFFER_SIZE')
    if protocol == 'tcp'
        @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false)
        @s.sync = true
        @use_header = true
    elsif protocol == 'udp'
        @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false)
        @s.sync = true
        @use_header = false
    elsif protocol == 'local'
        @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false)
        @s.sync = true
        @use_header = true
    else
        $stderr.puts 'CloudI service execution must occur in CloudI'
        raise InvalidInputException
    end
    @initialization_complete = false
    @terminate = false
    @size = buffer_size
    @callbacks = Hash.new
    @timeout_terminate = 10 # TIMEOUT_TERMINATE_MIN
    send(Erlang.term_to_binary(:init))
    poll_request(nil, false)
end

Instance Attribute Details

#prefixObject (readonly)

Returns the value of attribute prefix.



75
76
77
# File 'lib/cloudi.rb', line 75

def prefix
  @prefix
end

#priority_defaultObject (readonly)

Returns the value of attribute priority_default.



80
81
82
# File 'lib/cloudi.rb', line 80

def priority_default
  @priority_default
end

#process_countObject (readonly)

Returns the value of attribute process_count.



72
73
74
# File 'lib/cloudi.rb', line 72

def process_count
  @process_count
end

#process_count_maxObject (readonly)

Returns the value of attribute process_count_max.



73
74
75
# File 'lib/cloudi.rb', line 73

def process_count_max
  @process_count_max
end

#process_count_minObject (readonly)

Returns the value of attribute process_count_min.



74
75
76
# File 'lib/cloudi.rb', line 74

def process_count_min
  @process_count_min
end

#process_indexObject (readonly)

Returns the value of attribute process_index.



71
72
73
# File 'lib/cloudi.rb', line 71

def process_index
  @process_index
end

#timeout_asyncObject (readonly)

Returns the value of attribute timeout_async.



77
78
79
# File 'lib/cloudi.rb', line 77

def timeout_async
  @timeout_async
end

#timeout_initializeObject (readonly)

Returns the value of attribute timeout_initialize.



76
77
78
# File 'lib/cloudi.rb', line 76

def timeout_initialize
  @timeout_initialize
end

#timeout_syncObject (readonly)

Returns the value of attribute timeout_sync.



78
79
80
# File 'lib/cloudi.rb', line 78

def timeout_sync
  @timeout_sync
end

#timeout_terminateObject (readonly)

Returns the value of attribute timeout_terminate.



79
80
81
# File 'lib/cloudi.rb', line 79

def timeout_terminate
  @timeout_terminate
end

Class Method Details

.assertObject

Raises:



737
738
739
# File 'lib/cloudi.rb', line 737

def self.assert
    raise AssertionError unless yield # if $DEBUG
end

.info_key_value_new(pairs, response = true) ⇒ Object



733
734
735
# File 'lib/cloudi.rb', line 733

def self.info_key_value_new(pairs, response = true)
    return text_pairs_new(pairs, response)
end

.info_key_value_parse(info) ⇒ Object



729
730
731
# File 'lib/cloudi.rb', line 729

def self.info_key_value_parse(info)
    return text_pairs_parse(info)
end

.process_count_max_Object



252
253
254
# File 'lib/cloudi.rb', line 252

def self.process_count_max_
    return API.getenv_to_uint('CLOUDI_API_INIT_PROCESS_COUNT_MAX')
end

.process_count_min_Object



256
257
258
# File 'lib/cloudi.rb', line 256

def self.process_count_min_
    return API.getenv_to_uint('CLOUDI_API_INIT_PROCESS_COUNT_MIN')
end

.process_index_Object



248
249
250
# File 'lib/cloudi.rb', line 248

def self.process_index_
    return API.getenv_to_uint('CLOUDI_API_INIT_PROCESS_INDEX')
end

.thread_countObject



82
83
84
# File 'lib/cloudi.rb', line 82

def self.thread_count
    return API.getenv_to_uint('CLOUDI_API_INIT_THREAD_COUNT')
end

.timeout_initialize_Object



260
261
262
# File 'lib/cloudi.rb', line 260

def self.timeout_initialize_
    return API.getenv_to_uint('CLOUDI_API_INIT_TIMEOUT_INITIALIZE')
end

.timeout_terminate_Object



264
265
266
# File 'lib/cloudi.rb', line 264

def self.timeout_terminate_
    return API.getenv_to_uint('CLOUDI_API_INIT_TIMEOUT_TERMINATE')
end

Instance Method Details

#forward_(request_type, name, request_info, request, timeout, priority, trans_id, source) ⇒ Object



167
168
169
170
171
172
173
174
175
176
177
# File 'lib/cloudi.rb', line 167

def forward_(request_type, name, request_info, request,
             timeout, priority, trans_id, source)
    case request_type
    when ASYNC
        forward_async(name, request_info, request,
                      timeout, priority, trans_id, source)
    when SYNC
        forward_sync(name, request_info, request,
                     timeout, priority, trans_id, source)
    end
end

#forward_async(name, request_info, request, timeout, priority, trans_id, source) ⇒ Object



179
180
181
182
183
184
185
186
187
188
# File 'lib/cloudi.rb', line 179

def forward_async(name, request_info, request,
                  timeout, priority, trans_id, source)
    send(Erlang.term_to_binary([:forward_async, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority,
                                OtpErlangBinary.new(trans_id),
                                source]))
    raise ForwardAsyncException
end

#forward_sync(name, request_info, request, timeout, priority, trans_id, source) ⇒ Object



190
191
192
193
194
195
196
197
198
199
# File 'lib/cloudi.rb', line 190

def forward_sync(name, request_info, request,
                 timeout, priority, trans_id, source)
    send(Erlang.term_to_binary([:forward_sync, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority,
                                OtpErlangBinary.new(trans_id),
                                source]))
    raise ForwardSyncException
end

#mcast_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/cloudi.rb', line 149

def mcast_async(name, request,
                timeout=nil, request_info=nil, priority=nil)
    if timeout.nil?
        timeout = @timeout_async
    end
    if request_info.nil?
        request_info = ''
    end
    if priority.nil?
        priority = @priority_default
    end
    send(Erlang.term_to_binary([:mcast_async, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority]))
    return poll_request(nil, false)
end

#poll(timeout = nil) ⇒ Object



680
681
682
683
684
685
# File 'lib/cloudi.rb', line 680

def poll(timeout=nil)
    if timeout.nil?
        timeout = -1
    end
    return poll_request(timeout, true)
end

#recv_async(timeout = nil, trans_id = nil, consume = true) ⇒ Object



235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/cloudi.rb', line 235

def recv_async(timeout=nil, trans_id=nil, consume=true)
    if timeout.nil?
        timeout = @timeout_sync
    end
    if trans_id.nil?
        trans_id = 0.chr * 16
    end
    send(Erlang.term_to_binary([:recv_async, timeout,
                                OtpErlangBinary.new(trans_id),
                                consume]))
    return poll_request(nil, false)
end

#return_(request_type, name, pattern, response_info, response, timeout, trans_id, source) ⇒ Object



201
202
203
204
205
206
207
208
209
210
211
# File 'lib/cloudi.rb', line 201

def return_(request_type, name, pattern, response_info, response,
            timeout, trans_id, source)
    case request_type
    when ASYNC
        return_async(name, pattern, response_info, response,
                     timeout, trans_id, source)
    when SYNC
        return_sync(name, pattern, response_info, response,
                    timeout, trans_id, source)
    end
end

#return_async(name, pattern, response_info, response, timeout, trans_id, source) ⇒ Object



213
214
215
216
217
218
219
220
221
222
# File 'lib/cloudi.rb', line 213

def return_async(name, pattern, response_info, response,
                 timeout, trans_id, source)
    send(Erlang.term_to_binary([:return_async, name, pattern,
                                OtpErlangBinary.new(response_info),
                                OtpErlangBinary.new(response),
                                timeout,
                                OtpErlangBinary.new(trans_id),
                                source]))
    raise ReturnAsyncException
end

#return_sync(name, pattern, response_info, response, timeout, trans_id, source) ⇒ Object



224
225
226
227
228
229
230
231
232
233
# File 'lib/cloudi.rb', line 224

def return_sync(name, pattern, response_info, response,
                timeout, trans_id, source)
    send(Erlang.term_to_binary([:return_sync, name, pattern,
                                OtpErlangBinary.new(response_info),
                                OtpErlangBinary.new(response),
                                timeout,
                                OtpErlangBinary.new(trans_id),
                                source]))
    raise ReturnSyncException
end

#send_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/cloudi.rb', line 113

def send_async(name, request,
               timeout=nil, request_info=nil, priority=nil)
    if timeout.nil?
        timeout = @timeout_async
    end
    if request_info.nil?
        request_info = ''
    end
    if priority.nil?
        priority = @priority_default
    end
    send(Erlang.term_to_binary([:send_async, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority]))
    return poll_request(nil, false)
end

#send_sync(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/cloudi.rb', line 131

def send_sync(name, request,
              timeout=nil, request_info=nil, priority=nil)
    if timeout.nil?
        timeout = @timeout_sync
    end
    if request_info.nil?
        request_info = ''
    end
    if priority.nil?
        priority = @priority_default
    end
    send(Erlang.term_to_binary([:send_sync, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority]))
    return poll_request(nil, false)
end

#shutdown(reason = nil) ⇒ Object



687
688
689
690
691
692
# File 'lib/cloudi.rb', line 687

def shutdown(reason=nil)
    if reason.nil?
        reason = ''
    end
    send(Erlang.term_to_binary([:shutdown, reason]))
end

#subscribe(pattern, function) ⇒ Object



86
87
88
89
90
91
92
93
94
95
# File 'lib/cloudi.rb', line 86

def subscribe(pattern, function)
    key = @prefix + pattern
    value = @callbacks.fetch(key, nil)
    if value.nil?
        @callbacks[key] = [function]
    else
        value.push(function)
    end
    send(Erlang.term_to_binary([:subscribe, pattern]))
end

#subscribe_count(pattern) ⇒ Object



97
98
99
100
# File 'lib/cloudi.rb', line 97

def subscribe_count(pattern)
    send(Erlang.term_to_binary([:subscribe_count, pattern]))
    return poll_request(nil, false)
end

#unsubscribe(pattern) ⇒ Object



102
103
104
105
106
107
108
109
110
111
# File 'lib/cloudi.rb', line 102

def unsubscribe(pattern)
    key = @prefix + pattern
    value = @callbacks.fetch(key, nil)
    API.assert{value != nil}
    value.shift
    if value.empty?
        @callbacks.delete(key)
    end
    send(Erlang.term_to_binary([:unsubscribe, pattern]))
end