Class: Qpid::Proton::Connection
- Includes:
- Util::Deprecation, Util::Wrapper
- Defined in:
- lib/core/connection.rb
Overview
An AMQP connection.
Constant Summary collapse
- PROTON_METHOD_PREFIX =
"pn_connection"
Constants included from Util::Deprecation
Util::Deprecation::DEPRECATE_FULL_TRACE, Util::Deprecation::MATCH_DIR
Constants inherited from Endpoint
Endpoint::LOCAL_ACTIVE, Endpoint::LOCAL_CLOSED, Endpoint::LOCAL_MASK, Endpoint::LOCAL_UNINIT, Endpoint::REMOTE_ACTIVE, Endpoint::REMOTE_CLOSED, Endpoint::REMOTE_MASK, Endpoint::REMOTE_UNINIT
Instance Attribute Summary collapse
-
#container ⇒ Container
readonly
The container managing this connection.
-
#hostname ⇒ Object
deprecated
Deprecated.
use #virtual_host
-
#work_queue ⇒ WorkQueue
readonly
Work queue to execute code serialized correctly for this connection.
Attributes included from Util::Wrapper
Class Method Summary collapse
Instance Method Summary collapse
- #apply(opts) ⇒ Object
-
#close(error = nil) ⇒ Object
Closes the local end of the connection.
-
#connection ⇒ Connection
Self.
-
#container_id ⇒ Object
To get the local container ID use #container and Qpid::Proton::Container#id.
-
#default_session ⇒ Session
Returns the default session for this connection.
-
#desired_capabilities ⇒ Array<Symbol>
Desired capabilities provided by the remote peer.
-
#each_link ⇒ Object
Get the links on this connection.
-
#each_receiver ⇒ Object
Get the Receiver links - see #each_link.
-
#each_sender ⇒ Object
Get the Sender links - see #each_link.
-
#each_session(&block) ⇒ Object
Get the sessions on this connection.
- #error ⇒ Object deprecated Deprecated.
-
#idle_timeout ⇒ Numeric?
Idle-timeout advertised by the remote peer, in seconds.
-
#initialize(impl = Cproton.pn_connection) ⇒ Connection
constructor
A new instance of Connection.
-
#link_head(mask) ⇒ Object
deprecated
Deprecated.
use #each_link
- #link_name ⇒ Object
-
#max_frame_size ⇒ Integer?
Maximum frame size, in bytes, advertised by the remote peer.
-
#max_sessions ⇒ Integer?
Session limit advertised by the remote peer.
-
#offered_capabilities ⇒ Array<Symbol>
Offered capabilities provided by the remote peer.
-
#open(opts = nil) ⇒ Object
Open the local end of the connection.
-
#open_receiver(opts = nil) ⇒ Object
Open a on the default_session.
-
#open_sender(opts = nil) ⇒ Object
Open a sender on the default_session.
-
#open_session ⇒ Object
Open a new session on this connection.
-
#overrides? ⇒ Boolean
deprecated
Deprecated.
no replacement
-
#properties ⇒ Hash
Connection-properties provided by the remote peer.
-
#session_head(mask) ⇒ Object
deprecated
Deprecated.
use #each_session
-
#session_policy? ⇒ Boolean
deprecated
Deprecated.
no replacement
-
#state ⇒ Integer
Gets the endpoint current state flags.
-
#transport ⇒ Transport?
Transport bound to this connection, or nil if unbound.
-
#user ⇒ String
or the authenticated user name (incoming connection).
-
#virtual_host ⇒ String
The AMQP hostname for the connection.
-
#work_head ⇒ Object
deprecated
Deprecated.
use #MessagingHandler to handle work
Methods included from Util::Deprecation
deprecated, #deprecated, included
Methods included from Util::Wrapper
included, #inspect, registry, #to_s
Methods inherited from Endpoint
#check_state, #closed?, #condition, #local_closed?, #local_condition, #local_open?, #local_uninit?, #open?, #remote_closed?, #remote_condition, #remote_open?, #remote_uninit?
Constructor Details
#initialize(impl = Cproton.pn_connection) ⇒ Connection
Returns a new instance of Connection.
34 35 36 37 38 39 40 41 42 |
# File 'lib/core/connection.rb', line 34 def initialize(impl = Cproton.pn_connection) super() @impl = impl @overrides = nil @session_policy = nil @link_count = 0 @link_prefix = "" self.class.store_instance(self, :pn_connection_attachments) end |
Instance Attribute Details
#container ⇒ Container (readonly)
Returns the container managing this connection.
76 77 78 |
# File 'lib/core/connection.rb', line 76 def container @container end |
#hostname ⇒ Object
use #virtual_host
50 |
# File 'lib/core/connection.rb', line 50 proton_set_get :hostname |
#work_queue ⇒ WorkQueue (readonly)
Returns work queue to execute code serialized correctly for this connection.
289 290 291 |
# File 'lib/core/connection.rb', line 289 def work_queue @work_queue end |
Class Method Details
.wrap(impl) ⇒ Object
28 29 30 31 |
# File 'lib/core/connection.rb', line 28 def self.wrap(impl) return nil if impl.nil? self.fetch_instance(impl, :pn_connection_attachments) || Connection.new(impl) end |
Instance Method Details
#apply(opts) ⇒ Object
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/core/connection.rb', line 126 def apply opts # NOTE: Only connection options are set here. # Transport options must be applied with {Transport#apply} @container = opts[:container] cid = opts[:container_id] || (@container && @container.id) || SecureRandom.uuid cid = cid.to_s if cid.is_a? Symbol # Allow symbols as container name Cproton.pn_connection_set_container(@impl, cid) Cproton.pn_connection_set_user(@impl, opts[:user]) if opts[:user] Cproton.pn_connection_set_password(@impl, opts[:password]) if opts[:password] Cproton.pn_connection_set_hostname(@impl, opts[:virtual_host]) if opts[:virtual_host] @link_prefix = opts[:link_prefix] || cid Codec::Data.from_object(Cproton.pn_connection_offered_capabilities(@impl), opts[:offered_capabilities]) Codec::Data.from_object(Cproton.pn_connection_desired_capabilities(@impl), opts[:desired_capabilities]) Codec::Data.from_object(Cproton.pn_connection_properties(@impl), opts[:properties]) end |
#close(error = nil) ⇒ Object
Closes the local end of the connection. The remote end may or may not be closed.
172 173 174 175 |
# File 'lib/core/connection.rb', line 172 def close(error=nil) Condition.assign(_local_condition, error) Cproton.pn_connection_close(@impl) end |
#connection ⇒ Connection
Returns self.
65 |
# File 'lib/core/connection.rb', line 65 def connection() self; end |
#container_id ⇒ Object
To get the local container ID use #container and Qpid::Proton::Container#id
72 |
# File 'lib/core/connection.rb', line 72 def container_id() Cproton.pn_connection_remote_container(@impl); end |
#default_session ⇒ Session
Returns the default session for this connection.
194 195 196 |
# File 'lib/core/connection.rb', line 194 def default_session @session ||= open_session end |
#desired_capabilities ⇒ Array<Symbol>
Returns desired capabilities provided by the remote peer.
85 86 87 |
# File 'lib/core/connection.rb', line 85 def desired_capabilities Codec::Data.to_object(Cproton.pn_connection_remote_desired_capabilities(@impl)) end |
#each_link {|l| ... } ⇒ Object #each_link ⇒ Enumerator
Get the links on this connection.
248 249 250 251 252 253 254 255 256 257 |
# File 'lib/core/connection.rb', line 248 def each_link return enum_for(:each_link) unless block_given? l = Cproton.pn_link_head(@impl, 0); while l l2 = l # get next before yield, in case yield closes l and unlinks it l = Cproton.pn_link_next(l, 0) yield Link.wrap(l2) end self end |
#each_receiver ⇒ Object
Get the Receiver links - see #each_link
266 267 268 269 |
# File 'lib/core/connection.rb', line 266 def each_receiver() return enum_for(:each_receiver) unless block_given? each_link.select { |l| yield l if l.receiver? } end |
#each_sender ⇒ Object
Get the Sender links - see #each_link
260 261 262 263 |
# File 'lib/core/connection.rb', line 260 def each_sender() return enum_for(:each_sender) unless block_given? each_link.select { |l| yield l if l.sender? } end |
#each_session {|s| ... } ⇒ Object #each_session ⇒ Enumerator
Get the sessions on this connection.
227 228 229 230 231 232 233 234 235 |
# File 'lib/core/connection.rb', line 227 def each_session(&block) return enum_for(:each_session) unless block_given? s = Cproton.pn_session_head(@impl, 0); while s yield Session.wrap(s) s = Cproton.pn_session_next(s, 0) end self end |
#error ⇒ Object
278 279 280 281 |
# File 'lib/core/connection.rb', line 278 def error deprecated __method__, "#condition" Cproton.pn_error_code(Cproton.pn_connection_error(@impl)) end |
#idle_timeout ⇒ Numeric?
Idle-timeout advertised by the remote peer, in seconds.
145 146 147 148 149 |
# File 'lib/core/connection.rb', line 145 def idle_timeout() if transport && (t = transport.remote_idle_timeout) Rational(t, 1000) # More precise than Float end end |
#link_head(mask) ⇒ Object
use #each_link
238 239 240 241 |
# File 'lib/core/connection.rb', line 238 def link_head(mask) deprecated __method__, "#each_link" Link.wrap(Cproton.pn_link_head(@impl, mask)) end |
#link_name ⇒ Object
284 285 286 |
# File 'lib/core/connection.rb', line 284 def link_name() @link_prefix + "/" + (@link_count += 1).to_s(32) end |
#max_frame_size ⇒ Integer?
Maximum frame size, in bytes, advertised by the remote peer. See :max_frame_size
164 165 166 167 168 |
# File 'lib/core/connection.rb', line 164 def max_frame_size() raise StateError, "connection not bound to transport" unless transport max = transport.remote_max_frame return max.zero? ? nil : max end |
#max_sessions ⇒ Integer?
Session limit advertised by the remote peer. See :max_sessions
154 155 156 157 158 |
# File 'lib/core/connection.rb', line 154 def max_sessions() raise StateError, "connection not bound to transport" unless transport max = transport.remote_channel_max return max.zero? ? nil : max end |
#offered_capabilities ⇒ Array<Symbol>
Returns offered capabilities provided by the remote peer.
79 80 81 |
# File 'lib/core/connection.rb', line 79 def offered_capabilities Codec::Data.to_object(Cproton.pn_connection_remote_offered_capabilities(@impl)) end |
#open(opts = nil) ⇒ Object
Open the local end of the connection.
119 120 121 122 123 |
# File 'lib/core/connection.rb', line 119 def open(opts=nil) return if local_active? apply opts if opts Cproton.pn_connection_open(@impl) end |
#open_receiver(opts = nil) ⇒ Object
Open a on the default_session
214 |
# File 'lib/core/connection.rb', line 214 def open_receiver(opts=nil) default_session.open_receiver(opts) end |
#open_sender(opts = nil) ⇒ Object
Open a sender on the default_session
210 |
# File 'lib/core/connection.rb', line 210 def open_sender(opts=nil) default_session.open_sender(opts) end |
#open_session ⇒ Object
Open a new session on this connection.
202 203 204 205 206 |
# File 'lib/core/connection.rb', line 202 def open_session s = Session.wrap(Cproton.pn_session(@impl)) s.open return s end |
#overrides? ⇒ Boolean
no replacement
59 |
# File 'lib/core/connection.rb', line 59 def overrides?() deprecated __method__; false; end |
#properties ⇒ Hash
Returns connection-properties provided by the remote peer.
91 92 93 |
# File 'lib/core/connection.rb', line 91 def properties Codec::Data.to_object(Cproton.pn_connection_remote_properties(@impl)) end |
#session_head(mask) ⇒ Object
use #each_session
217 218 219 220 |
# File 'lib/core/connection.rb', line 217 def session_head(mask) deprecated __method__, "#each_session" Session.wrap(Cproton.pn_session_head(@impl, mask)) end |
#session_policy? ⇒ Boolean
no replacement
62 |
# File 'lib/core/connection.rb', line 62 def session_policy?() deprecated __method__; false; end |
#state ⇒ Integer
Gets the endpoint current state flags
186 187 188 |
# File 'lib/core/connection.rb', line 186 def state Cproton.pn_connection_state(@impl) end |
#transport ⇒ Transport?
Returns transport bound to this connection, or nil if unbound.
68 |
# File 'lib/core/connection.rb', line 68 def transport() Transport.wrap(Cproton.pn_connection_transport(@impl)); end |
#user ⇒ String
or the authenticated user name (incoming connection)
54 55 56 |
# File 'lib/core/connection.rb', line 54 def user() Cproton.pn_connection_get_user(impl) or (connection.transport && connection.transport.user) end |
#virtual_host ⇒ String
Returns The AMQP hostname for the connection.
45 |
# File 'lib/core/connection.rb', line 45 def virtual_host() Cproton.pn_connection_remote_hostname(@impl); end |
#work_head ⇒ Object
use #MessagingHandler to handle work
272 273 274 275 |
# File 'lib/core/connection.rb', line 272 def work_head deprecated __method__ Delivery.wrap(Cproton.pn_work_head(@impl)) end |