Class: Qpid::Proton::Connection

Inherits:
Endpoint
  • Object
show all
Includes:
Util::Deprecation, Util::Wrapper
Defined in:
lib/core/connection.rb

Overview

An AMQP connection.

Constant Summary collapse

PROTON_METHOD_PREFIX =
"pn_connection"

Constants included from Util::Deprecation

Util::Deprecation::DEPRECATE_FULL_TRACE, Util::Deprecation::MATCH_DIR

Constants inherited from Endpoint

Endpoint::LOCAL_ACTIVE, Endpoint::LOCAL_CLOSED, Endpoint::LOCAL_MASK, Endpoint::LOCAL_UNINIT, Endpoint::REMOTE_ACTIVE, Endpoint::REMOTE_CLOSED, Endpoint::REMOTE_MASK, Endpoint::REMOTE_UNINIT

Instance Attribute Summary collapse

Attributes included from Util::Wrapper

#impl

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Util::Deprecation

deprecated, #deprecated, included

Methods included from Util::Wrapper

included, #inspect, registry, #to_s

Methods inherited from Endpoint

#check_state, #closed?, #condition, #local_closed?, #local_condition, #local_open?, #local_uninit?, #open?, #remote_closed?, #remote_condition, #remote_open?, #remote_uninit?

Constructor Details

#initialize(impl = Cproton.pn_connection) ⇒ Connection

Returns a new instance of Connection.



34
35
36
37
38
39
40
41
42
# File 'lib/core/connection.rb', line 34

def initialize(impl = Cproton.pn_connection)
  super()
  @impl = impl
  @overrides = nil
  @session_policy = nil
  @link_count = 0
  @link_prefix = ""
  self.class.store_instance(self, :pn_connection_attachments)
end

Instance Attribute Details

#containerContainer (readonly)

Returns the container managing this connection.

Returns:

  • (Container)

    the container managing this connection



76
77
78
# File 'lib/core/connection.rb', line 76

def container
  @container
end

#hostnameObject

Deprecated.


50
# File 'lib/core/connection.rb', line 50

proton_set_get :hostname

#work_queueWorkQueue (readonly)

Returns work queue to execute code serialized correctly for this connection.

Returns:

  • (WorkQueue)

    work queue to execute code serialized correctly for this connection



289
290
291
# File 'lib/core/connection.rb', line 289

def work_queue
  @work_queue
end

Class Method Details

.wrap(impl) ⇒ Object



28
29
30
31
# File 'lib/core/connection.rb', line 28

def self.wrap(impl)
  return nil if impl.nil?
  self.fetch_instance(impl, :pn_connection_attachments) || Connection.new(impl)
end

Instance Method Details

#apply(opts) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/core/connection.rb', line 126

def apply opts
  # NOTE: Only connection options are set here.
  # Transport options must be applied with {Transport#apply}
  @container = opts[:container]
  cid = opts[:container_id] || (@container && @container.id) || SecureRandom.uuid
  cid = cid.to_s if cid.is_a? Symbol # Allow symbols as container name
  Cproton.pn_connection_set_container(@impl, cid)
  Cproton.pn_connection_set_user(@impl, opts[:user]) if opts[:user]
  Cproton.pn_connection_set_password(@impl, opts[:password]) if opts[:password]
  Cproton.pn_connection_set_hostname(@impl, opts[:virtual_host]) if opts[:virtual_host]
  @link_prefix = opts[:link_prefix] || cid
  Codec::Data.from_object(Cproton.pn_connection_offered_capabilities(@impl), opts[:offered_capabilities])
  Codec::Data.from_object(Cproton.pn_connection_desired_capabilities(@impl), opts[:desired_capabilities])
  Codec::Data.from_object(Cproton.pn_connection_properties(@impl), opts[:properties])
end

#close(error = nil) ⇒ Object

Closes the local end of the connection. The remote end may or may not be closed.

Parameters:

  • error (Condition) (defaults to: nil)

    Optional error condition to send with the close.



172
173
174
175
# File 'lib/core/connection.rb', line 172

def close(error=nil)
  Condition.assign(_local_condition, error)
  Cproton.pn_connection_close(@impl)
end

#connectionConnection

Returns self.

Returns:



65
# File 'lib/core/connection.rb', line 65

def connection() self; end

#container_idObject

To get the local container ID use #container and Qpid::Proton::Container#id

Returns:

  • AMQP container ID advertised by the remote peer.



72
# File 'lib/core/connection.rb', line 72

def container_id() Cproton.pn_connection_remote_container(@impl); end

#default_sessionSession

Returns the default session for this connection.

Returns:



194
195
196
# File 'lib/core/connection.rb', line 194

def default_session
  @session ||= open_session
end

#desired_capabilitiesArray<Symbol>

Returns desired capabilities provided by the remote peer.

Returns:

  • (Array<Symbol>)

    desired capabilities provided by the remote peer



85
86
87
# File 'lib/core/connection.rb', line 85

def desired_capabilities
  Codec::Data.to_object(Cproton.pn_connection_remote_desired_capabilities(@impl))
end

Get the links on this connection.

Overloads:

  • #each_link {|l| ... } ⇒ Object

    Yield Parameters:

    • l (Link)

      pass each link to block

  • #each_linkEnumerator

    Returns enumerator over links.

    Returns:

    • (Enumerator)

      enumerator over links



248
249
250
251
252
253
254
255
256
257
# File 'lib/core/connection.rb', line 248

def each_link
  return enum_for(:each_link) unless block_given?
  l = Cproton.pn_link_head(@impl, 0);
  while l
    l2 = l                  #  get next before yield, in case yield closes l and unlinks it
    l = Cproton.pn_link_next(l, 0)
    yield Link.wrap(l2)
  end
  self
end

#each_receiverObject

Get the Receiver links - see #each_link



266
267
268
269
# File 'lib/core/connection.rb', line 266

def each_receiver()
  return enum_for(:each_receiver) unless block_given?
  each_link.select { |l| yield l if l.receiver? }
end

#each_senderObject

Get the Sender links - see #each_link



260
261
262
263
# File 'lib/core/connection.rb', line 260

def each_sender()
  return enum_for(:each_sender) unless block_given?
  each_link.select { |l| yield l if l.sender? }
end

#each_session {|s| ... } ⇒ Object #each_sessionEnumerator

Get the sessions on this connection.

Overloads:

  • #each_session {|s| ... } ⇒ Object

    Yield Parameters:

    • s (Session)

      pass each session to block

  • #each_sessionEnumerator

    Returns enumerator over sessions.

    Returns:

    • (Enumerator)

      enumerator over sessions



227
228
229
230
231
232
233
234
235
# File 'lib/core/connection.rb', line 227

def each_session(&block)
  return enum_for(:each_session) unless block_given?
  s = Cproton.pn_session_head(@impl, 0);
  while s
    yield Session.wrap(s)
    s = Cproton.pn_session_next(s, 0)
  end
  self
end

#errorObject

Deprecated.


278
279
280
281
# File 'lib/core/connection.rb', line 278

def error
  deprecated __method__, "#condition"
  Cproton.pn_error_code(Cproton.pn_connection_error(@impl))
end

#idle_timeoutNumeric?

Idle-timeout advertised by the remote peer, in seconds.

Returns:

  • (Numeric)

    Idle-timeout advertised by the remote peer, in seconds.

  • (nil)

    if the peer does not advertise an idle time-out



145
146
147
148
149
# File 'lib/core/connection.rb', line 145

def idle_timeout()
  if transport && (t = transport.remote_idle_timeout)
    Rational(t, 1000)       # More precise than Float
  end
end
Deprecated.


238
239
240
241
# File 'lib/core/connection.rb', line 238

def link_head(mask)
  deprecated __method__, "#each_link"
  Link.wrap(Cproton.pn_link_head(@impl, mask))
end


284
285
286
# File 'lib/core/connection.rb', line 284

def link_name()
  @link_prefix + "/" +  (@link_count += 1).to_s(32)
end

#max_frame_sizeInteger?

Maximum frame size, in bytes, advertised by the remote peer. See :max_frame_size

Returns:

  • (Integer)

    maximum frame size

  • (nil)

    no limit

Raises:



164
165
166
167
168
# File 'lib/core/connection.rb', line 164

def max_frame_size()
  raise StateError, "connection not bound to transport" unless transport
  max = transport.remote_max_frame
  return max.zero? ? nil : max
end

#max_sessionsInteger?

Session limit advertised by the remote peer. See :max_sessions

Returns:

  • (Integer)

    maximum number of sessions per connection allowed by remote peer.

  • (nil)

    no specific limit is set.

Raises:



154
155
156
157
158
# File 'lib/core/connection.rb', line 154

def max_sessions()
  raise StateError, "connection not bound to transport" unless transport
  max = transport.remote_channel_max
  return max.zero? ? nil : max
end

#offered_capabilitiesArray<Symbol>

Returns offered capabilities provided by the remote peer.

Returns:

  • (Array<Symbol>)

    offered capabilities provided by the remote peer



79
80
81
# File 'lib/core/connection.rb', line 79

def offered_capabilities
  Codec::Data.to_object(Cproton.pn_connection_remote_offered_capabilities(@impl))
end

#open(opts = nil) ⇒ Object

Open the local end of the connection.

Parameters:

  • opts (Hash) (defaults to: nil)

    a customizable set of options

Options Hash (opts):

  • :handler (MessagingHandler)

    handler for events related to this connection.

  • :user (String)

    User name for authentication

  • :password (String)

    Authentication secret

  • :virtual_host (String)

    Virtual host name

  • :container_id (String) — default: provided by {Container}

    override advertised container-id

  • :properties (Hash<Symbol=>Object>)

    Application-defined properties

  • :offered_capabilities (Array<Symbol>)

    Extensions the endpoint supports

  • :desired_capabilities (Array<Symbol>)

    Extensions the endpoint can use

  • :idle_timeout (Numeric)

    Seconds before closing an idle connection

  • :max_sessions (Integer)

    Limit the number of active sessions

  • :max_frame_size (Integer)

    Limit the size of AMQP frames

  • :sasl_enabled (Boolean) — default: false

    Enable or disable SASL.

  • :sasl_allow_insecure_mechs (Boolean) — default: false

    Allow mechanisms send secrets in clear text

  • :sasl_allowed_mechs (String)

    SASL mechanisms allowed by this end of the connection

  • :ssl_domain (SSLDomain)

    SSL configuration domain.



119
120
121
122
123
# File 'lib/core/connection.rb', line 119

def open(opts=nil)
  return if local_active?
  apply opts if opts
  Cproton.pn_connection_open(@impl)
end

#open_receiver(opts = nil) ⇒ Object

Open a on the default_session



214
# File 'lib/core/connection.rb', line 214

def open_receiver(opts=nil) default_session.open_receiver(opts) end

#open_sender(opts = nil) ⇒ Object

Open a sender on the default_session



210
# File 'lib/core/connection.rb', line 210

def open_sender(opts=nil) default_session.open_sender(opts) end

#open_sessionObject

Open a new session on this connection.



202
203
204
205
206
# File 'lib/core/connection.rb', line 202

def open_session
  s = Session.wrap(Cproton.pn_session(@impl))
  s.open
  return s
end

#overrides?Boolean

Deprecated.

no replacement

Returns:

  • (Boolean)


59
# File 'lib/core/connection.rb', line 59

def overrides?() deprecated __method__; false; end

#propertiesHash

Returns connection-properties provided by the remote peer.

Returns:

  • (Hash)

    connection-properties provided by the remote peer



91
92
93
# File 'lib/core/connection.rb', line 91

def properties
  Codec::Data.to_object(Cproton.pn_connection_remote_properties(@impl))
end

#session_head(mask) ⇒ Object

Deprecated.


217
218
219
220
# File 'lib/core/connection.rb', line 217

def  session_head(mask)
  deprecated __method__, "#each_session"
  Session.wrap(Cproton.pn_session_head(@impl, mask))
end

#session_policy?Boolean

Deprecated.

no replacement

Returns:

  • (Boolean)


62
# File 'lib/core/connection.rb', line 62

def session_policy?() deprecated __method__; false; end

#stateInteger

Gets the endpoint current state flags

Returns:

  • (Integer)

    The state flags.

See Also:

  • Endpoint#LOCAL_UNINIT
  • Endpoint#LOCAL_ACTIVE
  • Endpoint#LOCAL_CLOSED
  • Endpoint#LOCAL_MASK


186
187
188
# File 'lib/core/connection.rb', line 186

def state
  Cproton.pn_connection_state(@impl)
end

#transportTransport?

Returns transport bound to this connection, or nil if unbound.

Returns:

  • (Transport, nil)

    transport bound to this connection, or nil if unbound.



68
# File 'lib/core/connection.rb', line 68

def transport() Transport.wrap(Cproton.pn_connection_transport(@impl)); end

#userString

or the authenticated user name (incoming connection)

Returns:

  • (String)

    User name used for authentication (outgoing connection)



54
55
56
# File 'lib/core/connection.rb', line 54

def user()
  Cproton.pn_connection_get_user(impl) or (connection.transport && connection.transport.user)
end

#virtual_hostString

Returns The AMQP hostname for the connection.

Returns:

  • (String)

    The AMQP hostname for the connection.



45
# File 'lib/core/connection.rb', line 45

def virtual_host() Cproton.pn_connection_remote_hostname(@impl); end

#work_headObject

Deprecated.

use #MessagingHandler to handle work



272
273
274
275
# File 'lib/core/connection.rb', line 272

def work_head
  deprecated __method__
  Delivery.wrap(Cproton.pn_work_head(@impl))
end