Class: CloudI::API

Inherits:
Object
  • Object
show all
Includes:
Erlang
Defined in:
lib/cloudi.rb

Constant Summary collapse

ASYNC =

unbuffered output is with $stderr.puts ‘…’

1
SYNC =
-1

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(thread_index) ⇒ API

Returns a new instance of API.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/cloudi.rb', line 44

def initialize(thread_index)
    protocol = API.getenv('CLOUDI_API_INIT_PROTOCOL')
    buffer_size_str = API.getenv('CLOUDI_API_INIT_BUFFER_SIZE')
    if protocol == 'tcp'
        @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false)
        @s.sync = true
        @use_header = true
    elsif protocol == 'udp'
        @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false)
        @s.sync = true
        @use_header = false
    elsif protocol == 'local'
        @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false)
        @s.sync = true
        @use_header = true
    else
        $stderr.puts 'CloudI service execution must occur in CloudI'
        raise InvalidInputException
    end
    @initialization_complete = false
    @terminate = false
    @size = buffer_size_str.to_i
    @callbacks = Hash.new
    @timeout_terminate = 10 # TIMEOUT_TERMINATE_MIN
    send(Erlang.term_to_binary(:init))
    poll_request(nil, false)
end

Instance Attribute Details

#prefixObject (readonly)

Returns the value of attribute prefix.



75
76
77
# File 'lib/cloudi.rb', line 75

def prefix
  @prefix
end

#process_countObject (readonly)

Returns the value of attribute process_count.



72
73
74
# File 'lib/cloudi.rb', line 72

def process_count
  @process_count
end

#process_count_maxObject (readonly)

Returns the value of attribute process_count_max.



73
74
75
# File 'lib/cloudi.rb', line 73

def process_count_max
  @process_count_max
end

#process_count_minObject (readonly)

Returns the value of attribute process_count_min.



74
75
76
# File 'lib/cloudi.rb', line 74

def process_count_min
  @process_count_min
end

#process_indexObject (readonly)

Returns the value of attribute process_index.



71
72
73
# File 'lib/cloudi.rb', line 71

def process_index
  @process_index
end

#timeout_asyncObject (readonly)

Returns the value of attribute timeout_async.



77
78
79
# File 'lib/cloudi.rb', line 77

def timeout_async
  @timeout_async
end

#timeout_initializeObject (readonly)

Returns the value of attribute timeout_initialize.



76
77
78
# File 'lib/cloudi.rb', line 76

def timeout_initialize
  @timeout_initialize
end

#timeout_syncObject (readonly)

Returns the value of attribute timeout_sync.



78
79
80
# File 'lib/cloudi.rb', line 78

def timeout_sync
  @timeout_sync
end

#timeout_terminateObject (readonly)

Returns the value of attribute timeout_terminate.



79
80
81
# File 'lib/cloudi.rb', line 79

def timeout_terminate
  @timeout_terminate
end

Class Method Details

.assertObject

Raises:



698
699
700
# File 'lib/cloudi.rb', line 698

def self.assert
    raise AssertionError unless yield # if $DEBUG
end

.info_key_value_new(pairs, response = true) ⇒ Object



694
695
696
# File 'lib/cloudi.rb', line 694

def self.info_key_value_new(pairs, response = true)
    return text_pairs_new(pairs, response)
end

.info_key_value_parse(info) ⇒ Object



690
691
692
# File 'lib/cloudi.rb', line 690

def self.info_key_value_parse(info)
    return text_pairs_parse(info)
end

.thread_countObject



81
82
83
84
# File 'lib/cloudi.rb', line 81

def self.thread_count
    s = getenv('CLOUDI_API_INIT_THREAD_COUNT')
    s.to_i
end

Instance Method Details

#forward_(request_type, name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object



167
168
169
170
171
172
173
174
175
176
177
# File 'lib/cloudi.rb', line 167

def forward_(request_type, name, request_info, request,
             timeout, priority, trans_id, pid)
    case request_type
    when ASYNC
        forward_async(name, request_info, request,
                      timeout, priority, trans_id, pid)
    when SYNC
        forward_sync(name, request_info, request,
                     timeout, priority, trans_id, pid)
    end
end

#forward_async(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object



179
180
181
182
183
184
185
186
187
# File 'lib/cloudi.rb', line 179

def forward_async(name, request_info, request,
                  timeout, priority, trans_id, pid)
    send(Erlang.term_to_binary([:forward_async, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority,
                                OtpErlangBinary.new(trans_id), pid]))
    raise ForwardAsyncException
end

#forward_sync(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object



189
190
191
192
193
194
195
196
197
# File 'lib/cloudi.rb', line 189

def forward_sync(name, request_info, request,
                 timeout, priority, trans_id, pid)
    send(Erlang.term_to_binary([:forward_sync, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority,
                                OtpErlangBinary.new(trans_id), pid]))
    raise ForwardSyncException
end

#mcast_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/cloudi.rb', line 149

def mcast_async(name, request,
                timeout=nil, request_info=nil, priority=nil)
    if timeout.nil?
        timeout = @timeout_async
    end
    if request_info.nil?
        request_info = ''
    end
    if priority.nil?
        priority = @priority_default
    end
    send(Erlang.term_to_binary([:mcast_async, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority]))
    return poll_request(nil, false)
end

#poll(timeout = nil) ⇒ Object



641
642
643
644
645
646
# File 'lib/cloudi.rb', line 641

def poll(timeout=nil)
    if timeout.nil?
        timeout = -1
    end
    return poll_request(timeout, true)
end

#recv_async(timeout = nil, trans_id = nil, consume = true) ⇒ Object



231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/cloudi.rb', line 231

def recv_async(timeout=nil, trans_id=nil, consume=true)
    if timeout.nil?
        timeout = @timeout_sync
    end
    if trans_id.nil?
        trans_id = 0.chr * 16
    end
    send(Erlang.term_to_binary([:recv_async, timeout,
                                OtpErlangBinary.new(trans_id),
                                consume]))
    return poll_request(nil, false)
end

#return_(request_type, name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object



199
200
201
202
203
204
205
206
207
208
209
# File 'lib/cloudi.rb', line 199

def return_(request_type, name, pattern, response_info, response,
            timeout, trans_id, pid)
    case request_type
    when ASYNC
        return_async(name, pattern, response_info, response,
                     timeout, trans_id, pid)
    when SYNC
        return_sync(name, pattern, response_info, response,
                    timeout, trans_id, pid)
    end
end

#return_async(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object



211
212
213
214
215
216
217
218
219
# File 'lib/cloudi.rb', line 211

def return_async(name, pattern, response_info, response,
                 timeout, trans_id, pid)
    send(Erlang.term_to_binary([:return_async, name, pattern,
                                OtpErlangBinary.new(response_info),
                                OtpErlangBinary.new(response),
                                timeout,
                                OtpErlangBinary.new(trans_id), pid]))
    raise ReturnAsyncException
end

#return_sync(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object



221
222
223
224
225
226
227
228
229
# File 'lib/cloudi.rb', line 221

def return_sync(name, pattern, response_info, response,
                timeout, trans_id, pid)
    send(Erlang.term_to_binary([:return_sync, name, pattern,
                                OtpErlangBinary.new(response_info),
                                OtpErlangBinary.new(response),
                                timeout,
                                OtpErlangBinary.new(trans_id), pid]))
    raise ReturnSyncException
end

#send_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/cloudi.rb', line 113

def send_async(name, request,
               timeout=nil, request_info=nil, priority=nil)
    if timeout.nil?
        timeout = @timeout_async
    end
    if request_info.nil?
        request_info = ''
    end
    if priority.nil?
        priority = @priority_default
    end
    send(Erlang.term_to_binary([:send_async, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority]))
    return poll_request(nil, false)
end

#send_sync(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/cloudi.rb', line 131

def send_sync(name, request,
              timeout=nil, request_info=nil, priority=nil)
    if timeout.nil?
        timeout = @timeout_sync
    end
    if request_info.nil?
        request_info = ''
    end
    if priority.nil?
        priority = @priority_default
    end
    send(Erlang.term_to_binary([:send_sync, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority]))
    return poll_request(nil, false)
end

#shutdown(reason = nil) ⇒ Object



648
649
650
651
652
653
# File 'lib/cloudi.rb', line 648

def shutdown(reason=nil)
    if reason.nil?
        reason = ''
    end
    send(Erlang.term_to_binary([:shutdown, reason]))
end

#subscribe(pattern, function) ⇒ Object



86
87
88
89
90
91
92
93
94
95
# File 'lib/cloudi.rb', line 86

def subscribe(pattern, function)
    key = @prefix + pattern
    value = @callbacks.fetch(key, nil)
    if value.nil?
        @callbacks[key] = [function]
    else
        value.push(function)
    end
    send(Erlang.term_to_binary([:subscribe, pattern]))
end

#subscribe_count(pattern) ⇒ Object



97
98
99
100
# File 'lib/cloudi.rb', line 97

def subscribe_count(pattern)
    send(Erlang.term_to_binary([:subscribe_count, pattern]))
    return poll_request(nil, false)
end

#unsubscribe(pattern) ⇒ Object



102
103
104
105
106
107
108
109
110
111
# File 'lib/cloudi.rb', line 102

def unsubscribe(pattern)
    key = @prefix + pattern
    value = @callbacks.fetch(key, nil)
    API.assert{value != nil}
    value.shift
    if value.empty?
        @callbacks.delete(key)
    end
    send(Erlang.term_to_binary([:unsubscribe, pattern]))
end