Class: Qpid::Proton::Container
- Inherits:
-
Object
- Object
- Qpid::Proton::Container
- Defined in:
- lib/core/container.rb
Overview
An AMQP container manages a set of Listeners and Connections which contain #Sender and #Receiver links to transfer messages. Usually, each AMQP client or server process has a single container for all of its connections and links.
One or more threads can call #run, events generated by all the listeners and connections will be dispatched in the #run threads.
Direct Known Subclasses
Defined Under Namespace
Classes: ConnectionTask, ListenTask, StoppedError
Instance Attribute Summary collapse
-
#auto_stop ⇒ Bool
Auto-stop flag.
-
#handler ⇒ MessagingHandler
readonly
The container-wide handler.
-
#id ⇒ String
readonly
Unique identifier for this container.
-
#stopped ⇒ Bool
True if the container has been stopped and can no longer be used.
Instance Method Summary collapse
-
#connect(url, opts = nil) ⇒ Connection
Open an AMQP connection.
-
#connect_io(io, opts = nil) ⇒ Object
Open an AMQP protocol connection on an existing IO object.
-
#initialize(*args) ⇒ Container
constructor
Create a new Container.
-
#listen(url, handler = Listener::Handler.new) ⇒ Listener
Listen for incoming AMQP connections.
-
#listen_io(io, handler = Listener::Handler.new) ⇒ Object
Listen for incoming AMQP connections on an existing server socket.
-
#run ⇒ Object
Run the container: wait for IO activity, dispatch events to handlers.
-
#running ⇒ Bool
Number of threads in #run.
-
#stop(error = nil) ⇒ Object
Stop the container.
Constructor Details
#initialize(id = nil) ⇒ Container #initialize(handler = nil, id = nil) ⇒ Container
Create a new Container
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/core/container.rb', line 108 def initialize(*args) case args.size when 2 then @handler, @id = args when 1 then @id = String.try_convert(args[0]) || (args[0].to_s if args[0].is_a? Symbol) @handler = args[0] unless @id else raise ArgumentError, "wrong number of arguments (given #{args.size}, expected 0..2" end # Use an empty messaging adapter to give default behaviour if there's no global handler. @adapter = Handler::Adapter.adapt(@handler) || Handler::MessagingAdapter.new(nil) @id = (@id || SecureRandom.uuid).freeze # Implementation note: # # - #run threads take work from @work # - Each driver and the Container itself is processed by at most one #run thread at a time # - The Container thread does IO.select # - nil on the @work queue makes a #run thread exit @work = Queue.new @work << :start << self # Issue start and start start selecting @wake = IO.pipe # Wakes #run thread in IO.select @auto_stop = true # Stop when @active drops to 0 # Following instance variables protected by lock @lock = Mutex.new @active = 0 # All active tasks, in @selectable, @work or being processed @selectable = Set.new # Tasks ready to block in IO.select @running = 0 # Count of #run threads @stopped = false # #stop called @stop_err = nil # Optional error to pass to tasks, from #stop end |
Instance Attribute Details
#auto_stop ⇒ Bool
155 156 157 |
# File 'lib/core/container.rb', line 155 def auto_stop @auto_stop end |
#handler ⇒ MessagingHandler (readonly)
Returns The container-wide handler.
142 143 144 |
# File 'lib/core/container.rb', line 142 def handler @handler end |
#id ⇒ String (readonly)
Returns unique identifier for this container.
145 146 147 |
# File 'lib/core/container.rb', line 145 def id @id end |
#stopped ⇒ Bool
True if the container has been stopped and can no longer be used.
159 160 161 |
# File 'lib/core/container.rb', line 159 def stopped @stopped end |
Instance Method Details
#connect(url, opts = nil) ⇒ Connection
172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/core/container.rb', line 172 def connect(url, opts=nil) not_stopped url = Qpid::Proton::uri url opts ||= {} if url.user || url.password opts[:user] ||= url.user opts[:password] ||= url.password end opts[:ssl_domain] ||= SSLDomain.new(SSLDomain::MODE_CLIENT) if url.scheme == "amqps" connect_io(TCPSocket.new(url.host, url.port), opts) end |
#connect_io(io, opts = nil) ⇒ Object
Open an AMQP protocol connection on an existing IO object
187 188 189 190 191 192 193 |
# File 'lib/core/container.rb', line 187 def connect_io(io, opts=nil) not_stopped cd = connection_driver(io, opts) cd.connection.open() add(cd) cd.connection end |
#listen(url, handler = Listener::Handler.new) ⇒ Listener
Listen for incoming AMQP connections
with events for this listener and can generate a new set of options for each one.
202 203 204 205 206 207 |
# File 'lib/core/container.rb', line 202 def listen(url, handler=Listener::Handler.new) not_stopped url = Qpid::Proton::uri url # TODO aconway 2017-11-01: amqps, SSL listen_io(TCPServer.new(url.host, url.port), handler) end |
#listen_io(io, handler = Listener::Handler.new) ⇒ Object
Listen for incoming AMQP connections on an existing server socket.
213 214 215 216 217 218 |
# File 'lib/core/container.rb', line 213 def listen_io(io, handler=Listener::Handler.new) not_stopped l = ListenTask.new(io, handler, self) add(l) l end |
#run ⇒ Object
Run the container: wait for IO activity, dispatch events to handlers.
More than one thread can call #run concurrently, the container will use all the #run threads as a thread pool. Calls to Handler::MessagingHandler methods are serialized for each connection or listener, even if the container has multiple threads.
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 |
# File 'lib/core/container.rb', line 227 def run @lock.synchronize do @running += 1 # Note: ensure clause below will decrement @running raise StoppedError if @stopped end while task = @work.pop case task when :start @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start when Container r, w = [@wake[0]], [] @lock.synchronize do @selectable.each do |s| r << s if s.send :can_read? w << s if s.send :can_write? end end r, w = IO.select(r, w) selected = Set.new(r).merge(w) drain_wake if selected.delete?(@wake[0]) stop_select = nil @lock.synchronize do if stop_select = @stopped # close everything selected += @selectable selected.each { |s| s.close @stop_err } @wake.each { |fd| fd.close() } end @selectable -= selected # Remove selected tasks end selected.each { |s| @work << s } # Queue up tasks needing #process @work << self unless stop_select when ConnectionTask then task.process rearm task when ListenTask then io, opts = task.process add(connection_driver(io, opts, true)) if io rearm task end # TODO aconway 2017-10-26: scheduled tasks, heartbeats end ensure @lock.synchronize do if (@running -= 1) > 0 work_wake nil # Signal the next thread else @adapter.on_container_stop(self) if @adapter.respond_to? :on_container_stop end end end |
#running ⇒ Bool
Number of threads in #run
163 |
# File 'lib/core/container.rb', line 163 def running; @lock.synchronize { @running }; end |
#stop(error = nil) ⇒ Object
Stop the container.
Close all listeners and abort all connections without doing AMQP protocol close.
#stop returns immediately, calls to #run will return when all activity is finished.
The container can no longer be used, using a stopped container raises StoppedError on attempting. Create a new container if you want to resume activity.
295 296 297 298 299 300 301 302 303 304 305 306 307 |
# File 'lib/core/container.rb', line 295 def stop(error=nil) @lock.synchronize do raise StoppedError if @stopped @stopped = true @stop_err = Condition.convert(error) check_stop_lh # NOTE: @stopped => # - no new run threads can join # - no more select calls after next wakeup # - once @active == 0, all threads will be stopped with nil end wake end |