Class: CloudI::API

Inherits:
Object
  • Object
show all
Includes:
Erlang
Defined in:
lib/cloudi.rb

Constant Summary collapse

ASYNC =

unbuffered output is with $stderr.puts ‘…’

1
SYNC =
-1

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(thread_index) ⇒ API



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/cloudi.rb', line 43

def initialize(thread_index)
    protocol = API.getenv('CLOUDI_API_INIT_PROTOCOL')
    buffer_size_str = API.getenv('CLOUDI_API_INIT_BUFFER_SIZE')
    if protocol == 'tcp'
        @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false)
        @s.sync = true
        @use_header = true
    elsif protocol == 'udp'
        @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false)
        @s.sync = true
        @use_header = false
    elsif protocol == 'local'
        @s = IO.for_fd(thread_index + 3, File::RDWR, autoclose: false)
        @s.sync = true
        @use_header = true
    else
        raise InvalidInputException
    end
    @initialization_complete = false
    @terminate = false
    @size = buffer_size_str.to_i
    @callbacks = Hash.new
    @timeout_terminate = 1000 # TIMEOUT_TERMINATE_MIN
    send(Erlang.term_to_binary(:init))
    poll_request(nil, false)
end

Instance Attribute Details

#prefixObject (readonly)

Returns the value of attribute prefix.



73
74
75
# File 'lib/cloudi.rb', line 73

def prefix
  @prefix
end

#process_countObject (readonly)

Returns the value of attribute process_count.



70
71
72
# File 'lib/cloudi.rb', line 70

def process_count
  @process_count
end

#process_count_maxObject (readonly)

Returns the value of attribute process_count_max.



71
72
73
# File 'lib/cloudi.rb', line 71

def process_count_max
  @process_count_max
end

#process_count_minObject (readonly)

Returns the value of attribute process_count_min.



72
73
74
# File 'lib/cloudi.rb', line 72

def process_count_min
  @process_count_min
end

#process_indexObject (readonly)

Returns the value of attribute process_index.



69
70
71
# File 'lib/cloudi.rb', line 69

def process_index
  @process_index
end

#timeout_asyncObject (readonly)

Returns the value of attribute timeout_async.



75
76
77
# File 'lib/cloudi.rb', line 75

def timeout_async
  @timeout_async
end

#timeout_initializeObject (readonly)

Returns the value of attribute timeout_initialize.



74
75
76
# File 'lib/cloudi.rb', line 74

def timeout_initialize
  @timeout_initialize
end

#timeout_syncObject (readonly)

Returns the value of attribute timeout_sync.



76
77
78
# File 'lib/cloudi.rb', line 76

def timeout_sync
  @timeout_sync
end

#timeout_terminateObject (readonly)

Returns the value of attribute timeout_terminate.



77
78
79
# File 'lib/cloudi.rb', line 77

def timeout_terminate
  @timeout_terminate
end

Class Method Details

.assertObject



638
639
640
# File 'lib/cloudi.rb', line 638

def self.assert
    raise 'Assertion failed !' unless yield # if $DEBUG
end

.getenv(key) ⇒ Object



703
704
705
# File 'lib/cloudi.rb', line 703

def self.getenv(key)
    ENV[key] or raise InvalidInputException
end

.thread_countObject



79
80
81
82
# File 'lib/cloudi.rb', line 79

def self.thread_count
    s = getenv('CLOUDI_API_INIT_THREAD_COUNT')
    s.to_i
end

Instance Method Details

#forward_(request_type, name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
# File 'lib/cloudi.rb', line 165

def forward_(request_type, name, request_info, request,
             timeout, priority, trans_id, pid)
    case request_type
    when ASYNC
        forward_async(name, request_info, request,
                      timeout, priority, trans_id, pid)
    when SYNC
        forward_sync(name, request_info, request,
                     timeout, priority, trans_id, pid)
    end
end

#forward_async(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object



177
178
179
180
181
182
183
184
185
# File 'lib/cloudi.rb', line 177

def forward_async(name, request_info, request,
                  timeout, priority, trans_id, pid)
    send(Erlang.term_to_binary([:forward_async, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority,
                                OtpErlangBinary.new(trans_id), pid]))
    raise ForwardAsyncException.new()
end

#forward_sync(name, request_info, request, timeout, priority, trans_id, pid) ⇒ Object



187
188
189
190
191
192
193
194
195
# File 'lib/cloudi.rb', line 187

def forward_sync(name, request_info, request,
                 timeout, priority, trans_id, pid)
    send(Erlang.term_to_binary([:forward_sync, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority,
                                OtpErlangBinary.new(trans_id), pid]))
    raise ForwardSyncException.new()
end

#info_key_value_parse(message_info) ⇒ Object



634
635
636
# File 'lib/cloudi.rb', line 634

def info_key_value_parse(message_info)
    return text_key_value_parse(message_info)
end

#mcast_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/cloudi.rb', line 147

def mcast_async(name, request,
                timeout=nil, request_info=nil, priority=nil)
    if timeout.nil?
        timeout = @timeout_async
    end
    if request_info.nil?
        request_info = ''
    end
    if priority.nil?
        priority = @priority_default
    end
    send(Erlang.term_to_binary([:mcast_async, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority]))
    return poll_request(nil, false)
end

#poll(timeout = nil) ⇒ Object



611
612
613
614
615
616
# File 'lib/cloudi.rb', line 611

def poll(timeout=nil)
    if timeout.nil?
        timeout = -1
    end
    return poll_request(timeout, true)
end

#recv_async(timeout = nil, trans_id = nil, consume = true) ⇒ Object



229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/cloudi.rb', line 229

def recv_async(timeout=nil, trans_id=nil, consume=true)
    if timeout.nil?
        timeout = @timeout_sync
    end
    if trans_id.nil?
        trans_id = 0.chr * 16
    end
    send(Erlang.term_to_binary([:recv_async, timeout,
                                OtpErlangBinary.new(trans_id),
                                consume]))
    return poll_request(nil, false)
end

#return_(request_type, name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object



197
198
199
200
201
202
203
204
205
206
207
# File 'lib/cloudi.rb', line 197

def return_(request_type, name, pattern, response_info, response,
            timeout, trans_id, pid)
    case request_type
    when ASYNC
        return_async(name, pattern, response_info, response,
                     timeout, trans_id, pid)
    when SYNC
        return_sync(name, pattern, response_info, response,
                    timeout, trans_id, pid)
    end
end

#return_async(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object



209
210
211
212
213
214
215
216
217
# File 'lib/cloudi.rb', line 209

def return_async(name, pattern, response_info, response,
                 timeout, trans_id, pid)
    send(Erlang.term_to_binary([:return_async, name, pattern,
                                OtpErlangBinary.new(response_info),
                                OtpErlangBinary.new(response),
                                timeout,
                                OtpErlangBinary.new(trans_id), pid]))
    raise ReturnAsyncException.new()
end

#return_sync(name, pattern, response_info, response, timeout, trans_id, pid) ⇒ Object



219
220
221
222
223
224
225
226
227
# File 'lib/cloudi.rb', line 219

def return_sync(name, pattern, response_info, response,
                timeout, trans_id, pid)
    send(Erlang.term_to_binary([:return_sync, name, pattern,
                                OtpErlangBinary.new(response_info),
                                OtpErlangBinary.new(response),
                                timeout,
                                OtpErlangBinary.new(trans_id), pid]))
    raise ReturnSyncException.new()
end

#send_async(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/cloudi.rb', line 111

def send_async(name, request,
               timeout=nil, request_info=nil, priority=nil)
    if timeout.nil?
        timeout = @timeout_async
    end
    if request_info.nil?
        request_info = ''
    end
    if priority.nil?
        priority = @priority_default
    end
    send(Erlang.term_to_binary([:send_async, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority]))
    return poll_request(nil, false)
end

#send_sync(name, request, timeout = nil, request_info = nil, priority = nil) ⇒ Object



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/cloudi.rb', line 129

def send_sync(name, request,
              timeout=nil, request_info=nil, priority=nil)
    if timeout.nil?
        timeout = @timeout_sync
    end
    if request_info.nil?
        request_info = ''
    end
    if priority.nil?
        priority = @priority_default
    end
    send(Erlang.term_to_binary([:send_sync, name,
                                OtpErlangBinary.new(request_info),
                                OtpErlangBinary.new(request),
                                timeout, priority]))
    return poll_request(nil, false)
end

#subscribe(pattern, function) ⇒ Object



84
85
86
87
88
89
90
91
92
93
# File 'lib/cloudi.rb', line 84

def subscribe(pattern, function)
    key = @prefix + pattern
    value = @callbacks.fetch(key, nil)
    if value.nil?
        @callbacks[key] = [function]
    else
        value.push(function)
    end
    send(Erlang.term_to_binary([:subscribe, pattern]))
end

#subscribe_count(pattern) ⇒ Object



95
96
97
98
# File 'lib/cloudi.rb', line 95

def subscribe_count(pattern)
    send(Erlang.term_to_binary([:subscribe_count, pattern]))
    return poll_request(nil, false)
end

#unsubscribe(pattern) ⇒ Object



100
101
102
103
104
105
106
107
108
109
# File 'lib/cloudi.rb', line 100

def unsubscribe(pattern)
    key = @prefix + pattern
    value = @callbacks.fetch(key, nil)
    API.assert{value != nil}
    value.shift
    if value.empty?
        @callbacks.delete(key)
    end
    send(Erlang.term_to_binary([:unsubscribe, pattern]))
end