Class: Qpid::Proton::Container

Inherits:
Object
  • Object
show all
Defined in:
lib/core/container.rb

Overview

An AMQP container manages a set of Listeners and Connections which contain #Sender and #Receiver links to transfer messages. Usually, each AMQP client or server process has a single container for all of its connections and links.

One or more threads can call #run, events generated by all the listeners and connections will be dispatched in the #run threads.

Direct Known Subclasses

Reactor::Container

Defined Under Namespace

Classes: ConnectionTask, ListenTask, StoppedError

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id = nil) ⇒ Container #initialize(handler = nil, id = nil) ⇒ Container

Create a new Container

Overloads:

  • #initialize(id = nil) ⇒ Container

    Parameters:

    • id (String, Symbol) (defaults to: nil)

      A unique ID for this container, use random UUID if nil.

  • #initialize(handler = nil, id = nil) ⇒ Container

    Parameters:

    • id (String, Symbol) (defaults to: nil)

      A unique ID for this container, use random UUID if nil.

    • handler (MessagingHandler) (defaults to: nil)

      Optional default handler for connections that do not have their own handler (see #connect and #listen)

      Note: For multi-threaded code, it is recommended to use a separate handler instance for each connection, as a shared handler may be called concurrently.



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/core/container.rb', line 108

def initialize(*args)
  case args.size
  when 2 then @handler, @id = args
  when 1 then
    @id = String.try_convert(args[0]) || (args[0].to_s if args[0].is_a? Symbol)
    @handler = args[0] unless @id
  else raise ArgumentError, "wrong number of arguments (given #{args.size}, expected 0..2"
  end
  # Use an empty messaging adapter to give default behaviour if there's no global handler.
  @adapter = Handler::Adapter.adapt(@handler) || Handler::MessagingAdapter.new(nil)
  @id = (@id || SecureRandom.uuid).freeze

  # Implementation note:
  #
  # - #run threads take work from @work
  # - Each driver and the Container itself is processed by at most one #run thread at a time
  # - The Container thread does IO.select
  # - nil on the @work queue makes a #run thread exit

  @work = Queue.new
  @work << :start << self   # Issue start and start start selecting
  @wake = IO.pipe           # Wakes #run thread in IO.select
  @auto_stop = true         # Stop when @active drops to 0

  # Following instance variables protected by lock
  @lock = Mutex.new
  @active = 0               # All active tasks, in @selectable, @work or being processed
  @selectable = Set.new     # Tasks ready to block in IO.select
  @running = 0              # Count of #run threads
  @stopped = false          # #stop called
  @stop_err = nil           # Optional error to pass to tasks, from #stop
end

Instance Attribute Details

#auto_stopBool

Auto-stop flag.

True (the default) means that the container will stop automatically, as if #stop had been called, when the last listener or connection closes.

False means #run will not return unless #stop is called.

Returns:

  • (Bool)

    auto-stop state



155
156
157
# File 'lib/core/container.rb', line 155

def auto_stop
  @auto_stop
end

#handlerMessagingHandler (readonly)

Returns The container-wide handler.

Returns:



142
143
144
# File 'lib/core/container.rb', line 142

def handler
  @handler
end

#idString (readonly)

Returns unique identifier for this container.

Returns:

  • (String)

    unique identifier for this container



145
146
147
# File 'lib/core/container.rb', line 145

def id
  @id
end

#stoppedBool

True if the container has been stopped and can no longer be used.

Returns:

  • (Bool)

    stopped state



159
160
161
# File 'lib/core/container.rb', line 159

def stopped
  @stopped
end

Instance Method Details

#connect(url, opts = nil) ⇒ Connection

Open an AMQP connection.

url.scheme must be “amqp” or “amqps”, url.scheme.nil? is treated as “amqp” url.user, url.password are used as defaults if opts, opts are nil

Parameters:

  • url (String, URI)

    Open a TCPSocket to url.host, url.port.

Options Hash (opts):

  • :handler (MessagingHandler)

    handler for events related to this connection.

  • :user (String)

    User name for authentication

  • :password (String)

    Authentication secret

  • :virtual_host (String)

    Virtual host name

  • :container_id (String) — default: provided by {Container}

    override advertised container-id

  • :properties (Hash<Symbol=>Object>)

    Application-defined properties

  • :offered_capabilities (Array<Symbol>)

    Extensions the endpoint supports

  • :desired_capabilities (Array<Symbol>)

    Extensions the endpoint can use

  • :idle_timeout (Numeric)

    Seconds before closing an idle connection

  • :max_sessions (Integer)

    Limit the number of active sessions

  • :max_frame_size (Integer)

    Limit the size of AMQP frames

  • :sasl_enabled (Boolean) — default: false

    Enable or disable SASL.

  • :sasl_allow_insecure_mechs (Boolean) — default: false

    Allow mechanisms send secrets in clear text

  • :sasl_allowed_mechs (String)

    SASL mechanisms allowed by this end of the connection

  • :ssl_domain (SSLDomain)

    SSL configuration domain.

Returns:



172
173
174
175
176
177
178
179
180
181
182
# File 'lib/core/container.rb', line 172

def connect(url, opts=nil)
  not_stopped
  url = Qpid::Proton::uri url
  opts ||= {}
  if url.user ||  url.password
    opts[:user] ||= url.user
    opts[:password] ||= url.password
  end
  opts[:ssl_domain] ||= SSLDomain.new(SSLDomain::MODE_CLIENT) if url.scheme == "amqps"
  connect_io(TCPSocket.new(url.host, url.port), opts)
end

#connect_io(io, opts = nil) ⇒ Object

Open an AMQP protocol connection on an existing IO object

Parameters:

  • io (IO)

    An existing IO object, e.g. a TCPSocket

Options Hash (opts):

  • :handler (MessagingHandler)

    handler for events related to this connection.

  • :user (String)

    User name for authentication

  • :password (String)

    Authentication secret

  • :virtual_host (String)

    Virtual host name

  • :container_id (String) — default: provided by {Container}

    override advertised container-id

  • :properties (Hash<Symbol=>Object>)

    Application-defined properties

  • :offered_capabilities (Array<Symbol>)

    Extensions the endpoint supports

  • :desired_capabilities (Array<Symbol>)

    Extensions the endpoint can use

  • :idle_timeout (Numeric)

    Seconds before closing an idle connection

  • :max_sessions (Integer)

    Limit the number of active sessions

  • :max_frame_size (Integer)

    Limit the size of AMQP frames

  • :sasl_enabled (Boolean) — default: false

    Enable or disable SASL.

  • :sasl_allow_insecure_mechs (Boolean) — default: false

    Allow mechanisms send secrets in clear text

  • :sasl_allowed_mechs (String)

    SASL mechanisms allowed by this end of the connection

  • :ssl_domain (SSLDomain)

    SSL configuration domain.



187
188
189
190
191
192
193
# File 'lib/core/container.rb', line 187

def connect_io(io, opts=nil)
  not_stopped
  cd = connection_driver(io, opts)
  cd.connection.open()
  add(cd)
  cd.connection
end

#listen(url, handler = Listener::Handler.new) ⇒ Listener

Listen for incoming AMQP connections

with events for this listener and can generate a new set of options for each one.

Parameters:

Returns:



202
203
204
205
206
207
# File 'lib/core/container.rb', line 202

def listen(url, handler=Listener::Handler.new)
  not_stopped
  url = Qpid::Proton::uri url
  # TODO aconway 2017-11-01: amqps, SSL
  listen_io(TCPServer.new(url.host, url.port), handler)
end

#listen_io(io, handler = Listener::Handler.new) ⇒ Object

Listen for incoming AMQP connections on an existing server socket.

Parameters:

  • io

    A server socket, for example a TCPServer

  • handler (Listener::Handler) (defaults to: Listener::Handler.new)

    Handler for events from this listener



213
214
215
216
217
218
# File 'lib/core/container.rb', line 213

def listen_io(io, handler=Listener::Handler.new)
  not_stopped
  l = ListenTask.new(io, handler, self)
  add(l)
  l
end

#runObject

Run the container: wait for IO activity, dispatch events to handlers.

More than one thread can call #run concurrently, the container will use all the #run threads as a thread pool. Calls to Handler::MessagingHandler methods are serialized for each connection or listener, even if the container has multiple threads.



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/core/container.rb', line 227

def run
  @lock.synchronize do
    @running += 1        # Note: ensure clause below will decrement @running
    raise StoppedError if @stopped
  end
  while task = @work.pop
    case task

    when :start
      @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start

    when Container
      r, w = [@wake[0]], []
      @lock.synchronize do
        @selectable.each do |s|
          r << s if s.send :can_read?
          w << s if s.send :can_write?
        end
      end
      r, w = IO.select(r, w)
      selected = Set.new(r).merge(w)
      drain_wake if selected.delete?(@wake[0])
      stop_select = nil
      @lock.synchronize do
        if stop_select = @stopped # close everything
          selected += @selectable
          selected.each { |s| s.close @stop_err }
          @wake.each { |fd| fd.close() }
        end
        @selectable -= selected # Remove selected tasks
      end
      selected.each { |s| @work << s } # Queue up tasks needing #process
      @work << self unless stop_select

    when ConnectionTask then
      task.process
      rearm task

    when ListenTask then
      io, opts = task.process
      add(connection_driver(io, opts, true)) if io
      rearm task
    end
    # TODO aconway 2017-10-26: scheduled tasks, heartbeats
  end
ensure
  @lock.synchronize do
    if (@running -= 1) > 0
      work_wake nil         # Signal the next thread
    else
      @adapter.on_container_stop(self) if @adapter.respond_to? :on_container_stop
    end
  end
end

#runningBool

Number of threads in #run

Returns:

  • (Bool)

    #run thread count



163
# File 'lib/core/container.rb', line 163

def running; @lock.synchronize { @running }; end

#stop(error = nil) ⇒ Object

Stop the container.

Close all listeners and abort all connections without doing AMQP protocol close.

#stop returns immediately, calls to #run will return when all activity is finished.

The container can no longer be used, using a stopped container raises StoppedError on attempting. Create a new container if you want to resume activity.

Parameters:

  • error (Condition) (defaults to: nil)

    Optional transport/listener error condition



295
296
297
298
299
300
301
302
303
304
305
306
307
# File 'lib/core/container.rb', line 295

def stop(error=nil)
  @lock.synchronize do
    raise StoppedError if @stopped
    @stopped = true
    @stop_err = Condition.convert(error)
    check_stop_lh
    # NOTE: @stopped =>
    # - no new run threads can join
    # - no more select calls after next wakeup
    # - once @active == 0, all threads will be stopped with nil
  end
  wake
end