Class: Qpid::Proton::Connection
- Includes:
- Util::Deprecation, Util::Wrapper
- Defined in:
- lib/core/connection.rb
Overview
An AMQP connection.
Constant Summary collapse
- PROTON_METHOD_PREFIX =
"pn_connection"
Constants included from Util::Deprecation
Util::Deprecation::DEPRECATE_FULL_TRACE, Util::Deprecation::MATCH_DIR
Constants inherited from Endpoint
Endpoint::LOCAL_ACTIVE, Endpoint::LOCAL_CLOSED, Endpoint::LOCAL_MASK, Endpoint::LOCAL_UNINIT, Endpoint::REMOTE_ACTIVE, Endpoint::REMOTE_CLOSED, Endpoint::REMOTE_MASK, Endpoint::REMOTE_UNINIT
Instance Attribute Summary collapse
-
#container ⇒ Container
readonly
The container managing this connection.
-
#hostname ⇒ Object
deprecated
Deprecated.
use #virtual_host
-
#work_queue ⇒ WorkQueue
readonly
Work queue to execute code serialized correctly for this connection.
Attributes included from Util::Wrapper
Class Method Summary collapse
Instance Method Summary collapse
- #apply(opts) ⇒ Object
-
#close(error = nil) ⇒ Object
Closes the local end of the connection.
-
#connection ⇒ Connection
Self.
-
#container_id ⇒ Object
To get the local container ID use #container and Qpid::Proton::Container#id.
-
#default_session ⇒ Session
Returns the default session for this connection.
-
#desired_capabilities ⇒ Array<Symbol>
Desired capabilities provided by the remote peer.
-
#each_link ⇒ Object
Get the links on this connection.
-
#each_receiver ⇒ Object
Get the Receiver links - see #each_link.
-
#each_sender ⇒ Object
Get the Sender links - see #each_link.
-
#each_session(&block) ⇒ Object
Get the sessions on this connection.
- #error ⇒ Object deprecated Deprecated.
-
#idle_timeout ⇒ Numeric?
Idle-timeout advertised by the remote peer, in seconds.
-
#initialize(impl = Cproton.pn_connection) ⇒ Connection
constructor
A new instance of Connection.
-
#link_head(mask) ⇒ Object
deprecated
Deprecated.
use #each_link
- #link_name ⇒ Object
-
#max_frame_size ⇒ Integer?
Maximum frame size, in bytes, advertised by the remote peer.
-
#max_sessions ⇒ Integer?
Session limit advertised by the remote peer.
-
#offered_capabilities ⇒ Array<Symbol>
Offered capabilities provided by the remote peer.
-
#open(opts = nil) ⇒ Object
Open the local end of the connection.
-
#open_receiver(opts = nil) ⇒ Object
Open a on the default_session.
-
#open_sender(opts = nil) ⇒ Object
Open a sender on the default_session.
-
#open_session ⇒ Object
Open a new session on this connection.
-
#overrides? ⇒ Boolean
deprecated
Deprecated.
no replacement
-
#properties ⇒ Hash
Connection-properties provided by the remote peer.
-
#session_head(mask) ⇒ Object
deprecated
Deprecated.
use #each_session
-
#session_policy? ⇒ Boolean
deprecated
Deprecated.
no replacement
-
#state ⇒ Integer
Gets the endpoint current state flags.
-
#transport ⇒ Transport?
Transport bound to this connection, or nil if unbound.
-
#user ⇒ String
or the authenticated user name (incoming connection).
-
#virtual_host ⇒ String
The AMQP hostname for the connection.
-
#work_head ⇒ Object
deprecated
Deprecated.
use #MessagingHandler to handle work
Methods included from Util::Deprecation
deprecated, #deprecated, included
Methods included from Util::Wrapper
included, #inspect, registry, #to_s
Methods inherited from Endpoint
#check_state, #closed?, #condition, #local_closed?, #local_condition, #local_open?, #local_uninit?, #open?, #remote_closed?, #remote_condition, #remote_open?, #remote_uninit?
Constructor Details
#initialize(impl = Cproton.pn_connection) ⇒ Connection
Returns a new instance of Connection.
34 35 36 37 38 39 40 41 42 |
# File 'lib/core/connection.rb', line 34 def initialize(impl = Cproton.pn_connection) super() @impl = impl @overrides = nil @session_policy = nil @link_count = 0 @link_prefix = "" self.class.store_instance(self, :pn_connection_attachments) end |
Instance Attribute Details
#container ⇒ Container (readonly)
Returns the container managing this connection.
76 77 78 |
# File 'lib/core/connection.rb', line 76 def container @container end |
#hostname ⇒ Object
use #virtual_host
50 |
# File 'lib/core/connection.rb', line 50 proton_set_get :hostname |
#work_queue ⇒ WorkQueue (readonly)
Returns work queue to execute code serialized correctly for this connection.
291 292 293 |
# File 'lib/core/connection.rb', line 291 def work_queue @work_queue end |
Class Method Details
.wrap(impl) ⇒ Object
28 29 30 31 |
# File 'lib/core/connection.rb', line 28 def self.wrap(impl) return nil if impl.nil? self.fetch_instance(impl, :pn_connection_attachments) || Connection.new(impl) end |
Instance Method Details
#apply(opts) ⇒ Object
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/core/connection.rb', line 128 def apply opts # NOTE: Only connection options are set here. # Transport options must be applied with {Transport#apply} @container = opts[:container] cid = opts[:container_id] || (@container && @container.id) || SecureRandom.uuid cid = cid.to_s if cid.is_a? Symbol # Allow symbols as container name Cproton.pn_connection_set_container(@impl, cid) Cproton.pn_connection_set_user(@impl, opts[:user]) if opts[:user] Cproton.pn_connection_set_password(@impl, opts[:password]) if opts[:password] Cproton.pn_connection_set_hostname(@impl, opts[:virtual_host]) if opts[:virtual_host] @link_prefix = opts[:link_prefix] || cid Codec::Data.from_object(Cproton.pn_connection_offered_capabilities(@impl), opts[:offered_capabilities]) Codec::Data.from_object(Cproton.pn_connection_desired_capabilities(@impl), opts[:desired_capabilities]) Codec::Data.from_object(Cproton.pn_connection_properties(@impl), opts[:properties]) end |
#close(error = nil) ⇒ Object
Closes the local end of the connection. The remote end may or may not be closed.
174 175 176 177 |
# File 'lib/core/connection.rb', line 174 def close(error=nil) Condition.assign(_local_condition, error) Cproton.pn_connection_close(@impl) end |
#connection ⇒ Connection
Returns self.
65 |
# File 'lib/core/connection.rb', line 65 def connection() self; end |
#container_id ⇒ Object
To get the local container ID use #container and Qpid::Proton::Container#id
72 |
# File 'lib/core/connection.rb', line 72 def container_id() Cproton.pn_connection_remote_container(@impl); end |
#default_session ⇒ Session
Returns the default session for this connection.
196 197 198 |
# File 'lib/core/connection.rb', line 196 def default_session @session ||= open_session end |
#desired_capabilities ⇒ Array<Symbol>
Returns desired capabilities provided by the remote peer.
86 87 88 89 |
# File 'lib/core/connection.rb', line 86 def desired_capabilities # Provide capabilities consistently as an array, even if encoded as a single symbol Codec::Data.to_multiple(Cproton.pn_connection_remote_desired_capabilities(@impl)) end |
#each_link {|l| ... } ⇒ Object #each_link ⇒ Enumerator
Get the links on this connection.
250 251 252 253 254 255 256 257 258 259 |
# File 'lib/core/connection.rb', line 250 def each_link return enum_for(:each_link) unless block_given? l = Cproton.pn_link_head(@impl, 0); while l l2 = l # get next before yield, in case yield closes l and unlinks it l = Cproton.pn_link_next(l, 0) yield Link.wrap(l2) end self end |
#each_receiver ⇒ Object
Get the Receiver links - see #each_link
268 269 270 271 |
# File 'lib/core/connection.rb', line 268 def each_receiver() return enum_for(:each_receiver) unless block_given? each_link.select { |l| yield l if l.receiver? } end |
#each_sender ⇒ Object
Get the Sender links - see #each_link
262 263 264 265 |
# File 'lib/core/connection.rb', line 262 def each_sender() return enum_for(:each_sender) unless block_given? each_link.select { |l| yield l if l.sender? } end |
#each_session {|s| ... } ⇒ Object #each_session ⇒ Enumerator
Get the sessions on this connection.
229 230 231 232 233 234 235 236 237 |
# File 'lib/core/connection.rb', line 229 def each_session(&block) return enum_for(:each_session) unless block_given? s = Cproton.pn_session_head(@impl, 0); while s yield Session.wrap(s) s = Cproton.pn_session_next(s, 0) end self end |
#error ⇒ Object
280 281 282 283 |
# File 'lib/core/connection.rb', line 280 def error deprecated __method__, "#condition" Cproton.pn_error_code(Cproton.pn_connection_error(@impl)) end |
#idle_timeout ⇒ Numeric?
Idle-timeout advertised by the remote peer, in seconds.
147 148 149 150 151 |
# File 'lib/core/connection.rb', line 147 def idle_timeout() if transport && (t = transport.remote_idle_timeout) Rational(t, 1000) # More precise than Float end end |
#link_head(mask) ⇒ Object
use #each_link
240 241 242 243 |
# File 'lib/core/connection.rb', line 240 def link_head(mask) deprecated __method__, "#each_link" Link.wrap(Cproton.pn_link_head(@impl, mask)) end |
#link_name ⇒ Object
286 287 288 |
# File 'lib/core/connection.rb', line 286 def link_name() @link_prefix + "/" + (@link_count += 1).to_s(32) end |
#max_frame_size ⇒ Integer?
Maximum frame size, in bytes, advertised by the remote peer. See :max_frame_size
166 167 168 169 170 |
# File 'lib/core/connection.rb', line 166 def max_frame_size() raise StateError, "connection not bound to transport" unless transport max = transport.remote_max_frame return max.zero? ? nil : max end |
#max_sessions ⇒ Integer?
Session limit advertised by the remote peer. See :max_sessions
156 157 158 159 160 |
# File 'lib/core/connection.rb', line 156 def max_sessions() raise StateError, "connection not bound to transport" unless transport max = transport.remote_channel_max return max.zero? ? nil : max end |
#offered_capabilities ⇒ Array<Symbol>
Returns offered capabilities provided by the remote peer.
79 80 81 82 |
# File 'lib/core/connection.rb', line 79 def offered_capabilities # Provide capabilities consistently as an array, even if encoded as a single symbol Codec::Data.to_multiple(Cproton.pn_connection_remote_offered_capabilities(@impl)) end |
#open(opts = nil) ⇒ Object
Open the local end of the connection.
121 122 123 124 125 |
# File 'lib/core/connection.rb', line 121 def open(opts=nil) return if local_active? apply opts if opts Cproton.pn_connection_open(@impl) end |
#open_receiver(opts = nil) ⇒ Object
Open a on the default_session
216 |
# File 'lib/core/connection.rb', line 216 def open_receiver(opts=nil) default_session.open_receiver(opts) end |
#open_sender(opts = nil) ⇒ Object
Open a sender on the default_session
212 |
# File 'lib/core/connection.rb', line 212 def open_sender(opts=nil) default_session.open_sender(opts) end |
#open_session ⇒ Object
Open a new session on this connection.
204 205 206 207 208 |
# File 'lib/core/connection.rb', line 204 def open_session s = Session.wrap(Cproton.pn_session(@impl)) s.open return s end |
#overrides? ⇒ Boolean
no replacement
59 |
# File 'lib/core/connection.rb', line 59 def overrides?() deprecated __method__; false; end |
#properties ⇒ Hash
Returns connection-properties provided by the remote peer.
93 94 95 |
# File 'lib/core/connection.rb', line 93 def properties Codec::Data.to_object(Cproton.pn_connection_remote_properties(@impl)) end |
#session_head(mask) ⇒ Object
use #each_session
219 220 221 222 |
# File 'lib/core/connection.rb', line 219 def session_head(mask) deprecated __method__, "#each_session" Session.wrap(Cproton.pn_session_head(@impl, mask)) end |
#session_policy? ⇒ Boolean
no replacement
62 |
# File 'lib/core/connection.rb', line 62 def session_policy?() deprecated __method__; false; end |
#state ⇒ Integer
Gets the endpoint current state flags
188 189 190 |
# File 'lib/core/connection.rb', line 188 def state Cproton.pn_connection_state(@impl) end |
#transport ⇒ Transport?
Returns transport bound to this connection, or nil if unbound.
68 |
# File 'lib/core/connection.rb', line 68 def transport() Transport.wrap(Cproton.pn_connection_transport(@impl)); end |
#user ⇒ String
or the authenticated user name (incoming connection)
54 55 56 |
# File 'lib/core/connection.rb', line 54 def user() Cproton.pn_connection_get_user(impl) or (connection.transport && connection.transport.user) end |
#virtual_host ⇒ String
Returns The AMQP hostname for the connection.
45 |
# File 'lib/core/connection.rb', line 45 def virtual_host() Cproton.pn_connection_remote_hostname(@impl); end |
#work_head ⇒ Object
use #MessagingHandler to handle work
274 275 276 277 |
# File 'lib/core/connection.rb', line 274 def work_head deprecated __method__ Delivery.wrap(Cproton.pn_work_head(@impl)) end |