Class: CloudI::API

Inherits:
Object
  • Object
show all
Includes:
Erlang
Defined in:
lib/cloudi.rb

Constant Summary collapse

ASYNC =

unbuffered output is with $stderr.puts ‘…’

1
SYNC =
-1

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(thread_index) ⇒ API

Returns a new instance of API.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/cloudi.rb', line 44

def initialize(thread_index)
    protocol = API.getenv('CLOUDI_API_INIT_PROTOCOL')
    buffer_size_str = API.getenv('CLOUDI_API_INIT_BUFFER_SIZE')
    if protocol == 'tcp'
        @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false)
        @s.sync = true
        @use_header = true
    elsif protocol == 'udp'
        @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false)
        @s.sync = true
        @use_header = false
    elsif protocol == 'local'
        @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false)
        @s.sync = true
        @use_header = true
    else
        $stderr.puts 'CloudI service execution must occur in CloudI'
        raise InvalidInputException
    end
    @initialization_complete = false
    @terminate = false
    @size = buffer_size_str.to_i
    @callbacks = Hash.new
    @timeout_terminate = 10 # TIMEOUT_TERMINATE_MIN
    send(Erlang.term_to_binary(:init))
    poll_request(nil, false)
end

Instance Attribute Details

#prefixObject (readonly)

Returns the value of attribute prefix.



75
76
77
# File 'lib/cloudi.rb', line 75

def prefix
  @prefix
end

#priority_defaultObject (readonly)

Returns the value of attribute priority_default.



80
81
82
# File 'lib/cloudi.rb', line 80

def priority_default
  @priority_default
end

#process_countObject (readonly)

Returns the value of attribute process_count.



72
73
74
# File 'lib/cloudi.rb', line 72

def process_count
  @process_count
end

#process_count_maxObject (readonly)

Returns the value of attribute process_count_max.



73
74
75
# File 'lib/cloudi.rb', line 73

def process_count_max
  @process_count_max
end

#process_count_minObject (readonly)

Returns the value of attribute process_count_min.



74
75
76
# File 'lib/cloudi.rb', line 74

def process_count_min
  @process_count_min
end

#process_indexObject (readonly)

Returns the value of attribute process_index.



71
72
73
# File 'lib/cloudi.rb', line 71

def process_index
  @process_index
end

#timeout_asyncObject (readonly)

Returns the value of attribute timeout_async.



77
78
79
# File 'lib/cloudi.rb', line 77

def timeout_async
  @timeout_async
end

#timeout_initializeObject (readonly)

Returns the value of attribute timeout_initialize.



76
77
78
# File 'lib/cloudi.rb', line 76

def timeout_initialize
  @timeout_initialize
end

#timeout_syncObject (readonly)

Returns the value of attribute timeout_sync.



78
79
80
# File 'lib/cloudi.rb', line 78

def timeout_sync
  @timeout_sync
end

#timeout_terminateObject (readonly)

Returns the value of attribute timeout_terminate.



79
80
81
# File 'lib/cloudi.rb', line 79

def timeout_terminate
  @timeout_terminate
end

Class Method Details

.assertObject

Raises:



699
700
701
# File 'lib/cloudi.rb', line 699

def self.assert
    raise AssertionError unless yield # if $DEBUG
end

.info_key_value_new(pairs, response = true) ⇒ Object



695
696
697
# File 'lib/cloudi.rb', line 695

def self.info_key_value_new(pairs, response = true)
    return text_pairs_new(pairs, response)
end

.info_key_value_parse(info) ⇒ Object



691
692
693
# File 'lib/cloudi.rb', line 691

def self.info_key_value_parse(info)
    return text_pairs_parse(info)
end

.thread_countObject



82
83
84
85
# File 'lib/cloudi.rb', line 82

def self.thread_count
    s = getenv('CLOUDI_API_INIT_THREAD_COUNT')
    s.to_i
end

Instance Method Details

#forward_(request_type, name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object



168
169
170
171
172
173
174
175
176
177
178
# File 'lib/cloudi.rb', line 168

def forward_(request_type, name, request_info, request,
             timeout, priority, trans_id, pid)
    case request_type
    when ASYNC
        forward_async(name, request_info, request,
                      timeout, priority, trans_id, pid)
    when SYNC
        forward_sync(name, request_info, request,
                     timeout, priority, trans_id, pid)
    end
end

#forward_async(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object

Raises:



180
181
182
183
184
185
186
187
188
# File 'lib/cloudi.rb', line 180

def forward_async(name, request_info, request,
                  timeout, priority, trans_id, pid)
    send(Erlang.term_to_binary([:forward_async, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority,
                                OtpErlangBinary.new(trans_id), pid]))
    raise ForwardAsyncException
end

#forward_sync(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object

Raises:



190
191
192
193
194
195
196
197
198
# File 'lib/cloudi.rb', line 190

def forward_sync(name, request_info, request,
                 timeout, priority, trans_id, pid)
    send(Erlang.term_to_binary([:forward_sync, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority,
                                OtpErlangBinary.new(trans_id), pid]))
    raise ForwardSyncException
end

#mcast_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/cloudi.rb', line 150

def mcast_async(name, request,
                timeout=nil, request_info=nil, priority=nil)
    if timeout.nil?
        timeout = @timeout_async
    end
    if request_info.nil?
        request_info = ''
    end
    if priority.nil?
        priority = @priority_default
    end
    send(Erlang.term_to_binary([:mcast_async, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority]))
    return poll_request(nil, false)
end

#poll(timeout = nil) ⇒ Object



642
643
644
645
646
647
# File 'lib/cloudi.rb', line 642

def poll(timeout=nil)
    if timeout.nil?
        timeout = -1
    end
    return poll_request(timeout, true)
end

#recv_async(timeout = nil, trans_id = nil, consume = true) ⇒ Object



232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/cloudi.rb', line 232

def recv_async(timeout=nil, trans_id=nil, consume=true)
    if timeout.nil?
        timeout = @timeout_sync
    end
    if trans_id.nil?
        trans_id = 0.chr * 16
    end
    send(Erlang.term_to_binary([:recv_async, timeout,
                                OtpErlangBinary.new(trans_id),
                                consume]))
    return poll_request(nil, false)
end

#return_(request_type, name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object



200
201
202
203
204
205
206
207
208
209
210
# File 'lib/cloudi.rb', line 200

def return_(request_type, name, pattern, response_info, response,
            timeout, trans_id, pid)
    case request_type
    when ASYNC
        return_async(name, pattern, response_info, response,
                     timeout, trans_id, pid)
    when SYNC
        return_sync(name, pattern, response_info, response,
                    timeout, trans_id, pid)
    end
end

#return_async(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object

Raises:



212
213
214
215
216
217
218
219
220
# File 'lib/cloudi.rb', line 212

def return_async(name, pattern, response_info, response,
                 timeout, trans_id, pid)
    send(Erlang.term_to_binary([:return_async, name, pattern,
                                OtpErlangBinary.new(response_info),
                                OtpErlangBinary.new(response),
                                timeout,
                                OtpErlangBinary.new(trans_id), pid]))
    raise ReturnAsyncException
end

#return_sync(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object

Raises:



222
223
224
225
226
227
228
229
230
# File 'lib/cloudi.rb', line 222

def return_sync(name, pattern, response_info, response,
                timeout, trans_id, pid)
    send(Erlang.term_to_binary([:return_sync, name, pattern,
                                OtpErlangBinary.new(response_info),
                                OtpErlangBinary.new(response),
                                timeout,
                                OtpErlangBinary.new(trans_id), pid]))
    raise ReturnSyncException
end

#send_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/cloudi.rb', line 114

def send_async(name, request,
               timeout=nil, request_info=nil, priority=nil)
    if timeout.nil?
        timeout = @timeout_async
    end
    if request_info.nil?
        request_info = ''
    end
    if priority.nil?
        priority = @priority_default
    end
    send(Erlang.term_to_binary([:send_async, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority]))
    return poll_request(nil, false)
end

#send_sync(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/cloudi.rb', line 132

def send_sync(name, request,
              timeout=nil, request_info=nil, priority=nil)
    if timeout.nil?
        timeout = @timeout_sync
    end
    if request_info.nil?
        request_info = ''
    end
    if priority.nil?
        priority = @priority_default
    end
    send(Erlang.term_to_binary([:send_sync, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority]))
    return poll_request(nil, false)
end

#shutdown(reason = nil) ⇒ Object



649
650
651
652
653
654
# File 'lib/cloudi.rb', line 649

def shutdown(reason=nil)
    if reason.nil?
        reason = ''
    end
    send(Erlang.term_to_binary([:shutdown, reason]))
end

#subscribe(pattern, function) ⇒ Object



87
88
89
90
91
92
93
94
95
96
# File 'lib/cloudi.rb', line 87

def subscribe(pattern, function)
    key = @prefix + pattern
    value = @callbacks.fetch(key, nil)
    if value.nil?
        @callbacks[key] = [function]
    else
        value.push(function)
    end
    send(Erlang.term_to_binary([:subscribe, pattern]))
end

#subscribe_count(pattern) ⇒ Object



98
99
100
101
# File 'lib/cloudi.rb', line 98

def subscribe_count(pattern)
    send(Erlang.term_to_binary([:subscribe_count, pattern]))
    return poll_request(nil, false)
end

#unsubscribe(pattern) ⇒ Object



103
104
105
106
107
108
109
110
111
112
# File 'lib/cloudi.rb', line 103

def unsubscribe(pattern)
    key = @prefix + pattern
    value = @callbacks.fetch(key, nil)
    API.assert{value != nil}
    value.shift
    if value.empty?
        @callbacks.delete(key)
    end
    send(Erlang.term_to_binary([:unsubscribe, pattern]))
end