Class: Qpid::Proton::Reactor::Container
- Includes:
- Util::Reactor, Util::UUID
- Defined in:
- lib/reactor/container.rb
Overview
A representation of the AMQP concept of a container which, loosely speaking, is something that establishes links to or from another container on which messages are transferred.
This is an extension to the Reactor classthat adds convenience methods for creating instances of Qpid::Proton::Connection, Qpid::Proton::Sender and Qpid::Proton::Receiver.
Constant Summary
Constants inherited from Reactor
Instance Attribute Summary collapse
-
#container_id ⇒ Object
Returns the value of attribute container_id.
-
#global_handler ⇒ Object
Returns the value of attribute global_handler.
Attributes inherited from Reactor
Instance Method Summary collapse
- #_apply_link_options(options, link) ⇒ Object
- #_session(context) ⇒ Object
-
#connect(options = {}) ⇒ Object
Initiates the establishment of an AMQP connection.
-
#create_receiver(context, opts = {}) ⇒ Receiver
Initiates the establishment of a link over which messages can be received.
-
#create_sender(context, opts = {}) ⇒ Sender
Initiates the establishment of a link over which messages can be sent.
- #declare_transaction(context, handler = nil, settle_before_discharge = false) ⇒ Object
- #do_work(timeout = nil) ⇒ Object
- #id(container, remote, local) ⇒ Object
-
#initialize(handlers, options = {}) ⇒ Container
constructor
A new instance of Container.
-
#listen(url, ssl_domain = nil) ⇒ Object
Initiates a server socket, accepting incoming AMQP connections on the interface and port specified.
- #to_s ⇒ Object
Methods included from Util::UUID
Methods included from Util::Reactor
Methods inherited from Reactor
#acceptor, #connection, #handler, #handler=, #on_error, #process, #push_event, #quiesced?, #run, #schedule, #selectable, #timeout, #timeout=, #update, #wakeup, wrap
Methods included from Util::Wrapper
#impl, #impl=, included, registry
Methods included from Util::Timeout
#millis_to_sec, #millis_to_timeout, #sec_to_millis, #timeout_to_millis
Methods included from Util::SwigHelper
Methods included from Util::Handler
Constructor Details
#initialize(handlers, options = {}) ⇒ Container
Returns a new instance of Container.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/reactor/container.rb', line 58 def initialize(handlers, = {}) super(handlers, ) # only do the following if we're creating a new instance if !.has_key?(:impl) @ssl = SSLConfig.new if [:global_handler] self.global_handler = GlobalOverrides.new([:global_handler]) else # very ugly, but using self.global_handler doesn't work in the constructor ghandler = Reactor.instance_method(:global_handler).bind(self).call ghandler = GlobalOverrides.new(ghandler) Reactor.instance_method(:global_handler=).bind(self).call(ghandler) end @trigger = nil @container_id = generate_uuid end end |
Instance Attribute Details
#container_id ⇒ Object
Returns the value of attribute container_id.
55 56 57 |
# File 'lib/reactor/container.rb', line 55 def container_id @container_id end |
#global_handler ⇒ Object
Returns the value of attribute global_handler.
56 57 58 |
# File 'lib/reactor/container.rb', line 56 def global_handler @global_handler end |
Instance Method Details
#_apply_link_options(options, link) ⇒ Object
256 257 258 259 260 261 262 263 264 |
# File 'lib/reactor/container.rb', line 256 def (, link) if !.nil? && !.empty? if !.is_a?(::List) = [Options].flatten end .each {|option| o.apply(link) if o.test(link)} end end |
#_session(context) ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/reactor/container.rb', line 110 def _session(context) if context.is_a?(Qpid::Proton::URL) return self._session(self.connect(:url => context)) elsif context.is_a?(Qpid::Proton::Session) return context elsif context.is_a?(Qpid::Proton::Connection) if context.session_policy? return context.session_policy.session(context) else return self.create_session(context) end else return context.session end end |
#connect(options = {}) ⇒ Object
Initiates the establishment of an AMQP connection.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/reactor/container.rb', line 81 def connect( = {}) conn = self.connection([:handler]) conn.container = self.container_id || generate_uuid connector = Connector.new(conn) conn.overrides = connector if ![:url].nil? connector.address = URLs.new([[:url]]) elsif ![:urls].nil? connector.address = URLs.new([:urls]) elsif ![:address].nil? connector.address = URLs.new([Qpid::Proton::URL.new([:address])]) else raise ::ArgumentError.new("either :url or :urls or :address required") end connector.heartbeat = [:heartbeat] if ![:heartbeat].nil? if ![:reconnect].nil? connector.reconnect = [:reconnect] else connector.reconnect = Backoff.new() end connector.ssl_domain = SessionPerConnection.new # TODO seems this should be configurable conn.open return conn end |
#create_receiver(context, opts = {}) ⇒ Receiver
Initiates the establishment of a link over which messages can be received.
There are two accepted arguments for the context
1. If a Connection is supplied then the link is established using that
object. The source, and optionally the target, address can be supplied
2. If it is a String or a URL then a new Connection is created on which
the link will be attached. If a path is specified, but not the source address, then the path of the URL is used as the target address.
The name will be generated for the link if one is not specified.
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/reactor/container.rb', line 185 def create_receiver(context, opts = {}) if context.is_a?(::String) context = Qpid::Proton::URL.new(context) end source = opts[:source] if context.is_a?(Qpid::Proton::URL) && source.nil? source = context.path end session = self._session(context) receiver = session.receiver(opts[:name] || id(session.connection.container, source, opts[:target])) receiver.source.address = source if source receiver.source.dynamic = true if opts.has_key?(:dynamic) && opts[:dynamic] receiver.target.address = opts[:target] if !opts[:target].nil? receiver.handler = opts[:handler] if !opts[:handler].nil? self.(opts[:options], receiver) receiver.open return receiver end |
#create_sender(context, opts = {}) ⇒ Sender
Initiates the establishment of a link over which messages can be sent.
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/reactor/container.rb', line 139 def create_sender(context, opts = {}) if context.is_a?(::String) context = Qpid::Proton::URL.new(context) end target = opts[:target] if context.is_a?(Qpid::Proton::URL) && target.nil? target = context.path end session = self._session(context) sender = session.sender(opts[:name] || id(session.connection.container, target, opts[:source])) sender.source.address = opts[:source] if !opts[:source].nil? sender.target.address = target if target sender.handler = opts[:handler] if !opts[:handler].nil? sender.tag_generator = opts[:tag_generator] if !opts[:tag_gnenerator].nil? self.(opts[:options], sender) sender.open return sender end |
#declare_transaction(context, handler = nil, settle_before_discharge = false) ⇒ Object
209 210 211 212 213 214 215 216 217 218 |
# File 'lib/reactor/container.rb', line 209 def declare_transaction(context, handler = nil, settle_before_discharge = false) if context.respond_to? :txn_ctl && !context.__send__(:txn_ctl).nil? class << context attr_accessor :txn_ctl end context.txn_ctl = self.create_sender(context, nil, "txn-ctl", InternalTransactionHandler.new()) end return Transaction.new(context.txn_ctl, handler, settle_before_discharge) end |
#do_work(timeout = nil) ⇒ Object
239 240 241 242 |
# File 'lib/reactor/container.rb', line 239 def do_work(timeout = nil) self.timeout = timeout unless timeout.nil? self.process end |
#id(container, remote, local) ⇒ Object
244 245 246 247 248 249 250 251 252 253 254 |
# File 'lib/reactor/container.rb', line 244 def id(container, remote, local) if !local.nil? && !remote.nil? "#{container}-#{remote}-#{local}" elsif !local.nil? "#{container}-#{local}" elsif !remote.nil? "#{container}-#{remote}" else "#{container}-#{generate_uuid}" end end |
#listen(url, ssl_domain = nil) ⇒ Object
Initiates a server socket, accepting incoming AMQP connections on the interface and port specified.
226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/reactor/container.rb', line 226 def listen(url, ssl_domain = nil) url = Qpid::Proton::URL.new(url) acceptor = self.acceptor(url.host, url.port) ssl_config = ssl_domain if ssl_config.nil? && (url.scheme == 'amqps') && @ssl ssl_config = @ssl.server end if !ssl_config.nil? acceptor.ssl_domain(ssl_config) end return acceptor end |
#to_s ⇒ Object
266 267 268 |
# File 'lib/reactor/container.rb', line 266 def to_s "#{self.class}<@impl=#{Cproton.pni_address_of(@impl)}>" end |