Class: Qpid::Proton::Reactor::Reactor
Constant Summary
collapse
- PROTON_METHOD_PREFIX =
"pn_reactor"
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
#impl, #impl=, included, registry
#millis_to_sec, #millis_to_timeout, #sec_to_millis, #timeout_to_millis
included
#chandler
Constructor Details
#initialize(handlers, options = {}) ⇒ Reactor
Returns a new instance of Reactor.
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/reactor/reactor.rb', line 53
def initialize(handlers, options = {})
@impl = options[:impl]
if @impl.nil?
@impl = Cproton.pn_reactor
end
if !handlers.nil?
[handlers].flatten.each {|handler| self.handler.add(handler)}
end
@errors = []
@handlers = []
self.class.store_instance(self, :pn_reactor_attachments)
end
|
Instance Attribute Details
#errors ⇒ Object
Returns the value of attribute errors.
45
46
47
|
# File 'lib/reactor/reactor.rb', line 45
def errors
@errors
end
|
Class Method Details
.wrap(impl) ⇒ Object
47
48
49
50
51
|
# File 'lib/reactor/reactor.rb', line 47
def self.wrap(impl)
return nil if impl.nil?
self.fetch_instance(impl, :pn_reactor_attachments) || Reactor.new(nil, :impl => impl)
end
|
Instance Method Details
#acceptor(host, port, handler = nil) ⇒ Object
155
156
157
158
159
160
161
162
163
164
165
166
167
168
|
# File 'lib/reactor/reactor.rb', line 155
def acceptor(host, port, handler = nil)
impl = chandler(handler, self.method(:on_error))
aimpl = Cproton.pn_reactor_acceptor(@impl, host, "#{port}", impl)
Cproton.pn_decref(impl)
if !aimpl.nil?
return Acceptor.new(aimpl)
else
io = Cproton.pn_reactor_io(@impl)
io_error = Cproton.pn_io_error(io)
error_text = Cproton.pn_error_text(io_error)
text = "(#{Cproton.pn_error_text(io_error)} (#{host}:#{port}))"
raise IOError.new(text)
end
end
|
#connection(handler = nil) ⇒ Object
170
171
172
173
174
175
|
# File 'lib/reactor/reactor.rb', line 170
def connection(handler = nil)
impl = chandler(handler, self.method(:on_error))
conn = Qpid::Proton::Connection.wrap(Cproton.pn_reactor_connection(@impl, impl))
Cproton.pn_decref(impl)
return conn
end
|
#global_handler ⇒ Object
79
80
81
82
|
# File 'lib/reactor/reactor.rb', line 79
def global_handler
impl = Cproton.pn_reactor_get_global_handler(@impl)
Qpid::Proton::Handler::WrappedHandler.wrap(impl, self.method(:on_error))
end
|
#global_handler=(handler) ⇒ Object
84
85
86
87
88
|
# File 'lib/reactor/reactor.rb', line 84
def global_handler=(handler)
impl = chandler(handler, self.method(:on_error))
Cproton.pn_reactor_set_global_handler(@impl, impl)
Cproton.pn_decref(impl)
end
|
#handler ⇒ Object
106
107
108
109
|
# File 'lib/reactor/reactor.rb', line 106
def handler
impl = Cproton.pn_reactor_get_handler(@impl)
Qpid::Proton::Handler::WrappedHandler.wrap(impl, self.method(:on_error))
end
|
#handler=(handler) ⇒ Object
111
112
113
114
115
|
# File 'lib/reactor/reactor.rb', line 111
def handler=(handler)
impl = chandler(handler, set.method(:on_error))
Cproton.pn_reactor_set_handler(@impl, impl)
Cproton.pn_decref(impl)
end
|
#on_error(info) ⇒ Object
74
75
76
77
|
# File 'lib/reactor/reactor.rb', line 74
def on_error(info)
self.errors << info
self.yield
end
|
#process ⇒ Object
136
137
138
139
140
141
142
143
144
145
146
|
# File 'lib/reactor/reactor.rb', line 136
def process
result = Cproton.pn_reactor_process(@impl)
if !self.errors.nil? && !self.errors.empty?
(0...self.errors.size).each do |index|
error_set = self.errors[index]
print error.backtrace.join("\n")
end
raise self.errors.last
end
return result
end
|
#push_event(obj, etype) ⇒ Object
192
193
194
|
# File 'lib/reactor/reactor.rb', line 192
def push_event(obj, etype)
Cproton.pn_collector_put(Cproton.pn_reactor_collector(@impl), Qpid::Proton::Util::RBCTX, Cproton.pn_py2void(obj), etype.number)
end
|
#quiesced? ⇒ Boolean
Returns whether the reactor has any unbuffered data.
70
71
72
|
# File 'lib/reactor/reactor.rb', line 70
def quiesced?
Cproton.pn_reactor_quiesced(@impl)
end
|
#run(&block) ⇒ Object
117
118
119
120
121
122
123
124
125
126
|
# File 'lib/reactor/reactor.rb', line 117
def run(&block)
self.timeout = 3.14159265359
self.start
while self.process do
if block_given?
yield
end
end
self.stop
end
|
#schedule(delay, task) ⇒ Object
148
149
150
151
152
153
|
# File 'lib/reactor/reactor.rb', line 148
def schedule(delay, task)
impl = chandler(task, self.method(:on_error))
task = Task.wrap(Cproton.pn_reactor_schedule(@impl, sec_to_millis(delay), impl))
Cproton.pn_decref(impl)
return task
end
|
#selectable(handler = nil) ⇒ Object
177
178
179
180
181
182
183
184
185
186
|
# File 'lib/reactor/reactor.rb', line 177
def selectable(handler = nil)
impl = chandler(handler, self.method(:on_error))
result = Selectable.wrap(Cproton.pn_reactor_selectable(@impl))
if !impl.nil?
record = Cproton.pn_selectable_attachments(result.impl)
Cproton.pn_record_set_handler(record, impl)
Cproton.pn_decref(impl)
end
return result
end
|
#timeout ⇒ Fixnum
Returns the timeout period.
94
95
96
|
# File 'lib/reactor/reactor.rb', line 94
def timeout
millis_to_timeout(Cproton.pn_reactor_get_timeout(@impl))
end
|
#timeout=(timeout) ⇒ Object
102
103
104
|
# File 'lib/reactor/reactor.rb', line 102
def timeout=(timeout)
Cproton.pn_reactor_set_timeout(@impl, timeout_to_millis(timeout))
end
|
#update(sel) ⇒ Object
188
189
190
|
# File 'lib/reactor/reactor.rb', line 188
def update(sel)
Cproton.pn_reactor_update(@impl, sel.impl)
end
|
#wakeup ⇒ Object
128
129
130
131
132
133
134
|
# File 'lib/reactor/reactor.rb', line 128
def wakeup
n = Cproton.pn_reactor_wakeup(@impl)
unless n.zero?
io = Cproton.pn_reactor_io(@impl)
raise IOError.new(Cproton.pn_io_error(io))
end
end
|