Class: Bunny::Transport

Inherits:
Object
  • Object
show all
Defined in:
lib/bunny/transport.rb

Constant Summary collapse

DEFAULT_CONNECTION_TIMEOUT =

Default TCP connection timeout

30.0
DEFAULT_READ_TIMEOUT =
30.0
DEFAULT_WRITE_TIMEOUT =
30.0
TLS_VERSION_ALIASES =

mimics METHODS_MAP in ssl.rb but also lists TLS 1.3 and string constants

{
  TLSv1: OpenSSL::SSL::TLS1_VERSION,
  TLSv1_1: OpenSSL::SSL::TLS1_1_VERSION,
  TLSv1_2: OpenSSL::SSL::TLS1_2_VERSION,
  "1.0": OpenSSL::SSL::TLS1_VERSION,
  "1.1": OpenSSL::SSL::TLS1_1_VERSION,
  "1.2": OpenSSL::SSL::TLS1_2_VERSION,
  OpenSSL::SSL::TLS1_VERSION => OpenSSL::SSL::TLS1_VERSION,
  OpenSSL::SSL::TLS1_1_VERSION => OpenSSL::SSL::TLS1_1_VERSION,
  OpenSSL::SSL::TLS1_2_VERSION => OpenSSL::SSL::TLS1_2_VERSION
}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(session, host, port, opts) ⇒ Transport

Returns a new instance of Transport.



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/bunny/transport.rb', line 62

def initialize(session, host, port, opts)
  @session        = session
  @session_error_handler = opts[:session_error_handler]
  @host    = host
  @port    = port
  @opts    = opts

  @logger                = session.logger
  @tls_enabled           = tls_enabled?(opts)

  @read_timeout = opts[:read_timeout] || DEFAULT_READ_TIMEOUT
  @read_timeout = nil if @read_timeout == 0

  @write_timeout = opts[:socket_timeout] # Backwards compatability

  @write_timeout ||= opts[:write_timeout] || DEFAULT_WRITE_TIMEOUT
  @write_timeout = nil if @write_timeout == 0

  @connect_timeout    = self.timeout_from(opts)
  @connect_timeout    = nil if @connect_timeout == 0
  @disconnect_timeout = @write_timeout || @read_timeout || @connect_timeout

  @writes_mutex       = @session.mutex_impl.new

  @socket = nil

  prepare_tls_context(opts) if @tls_enabled
end

Instance Attribute Details

#connect_timeoutObject (readonly)

Returns the value of attribute connect_timeout.



54
55
56
# File 'lib/bunny/transport.rb', line 54

def connect_timeout
  @connect_timeout
end

#disconnect_timeoutObject (readonly)

Returns the value of attribute disconnect_timeout.



54
55
56
# File 'lib/bunny/transport.rb', line 54

def disconnect_timeout
  @disconnect_timeout
end

#hostObject (readonly)

Returns the value of attribute host.



54
55
56
# File 'lib/bunny/transport.rb', line 54

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



54
55
56
# File 'lib/bunny/transport.rb', line 54

def port
  @port
end

#read_timeoutObject

Returns the value of attribute read_timeout.



54
55
56
# File 'lib/bunny/transport.rb', line 54

def read_timeout
  @read_timeout
end

#sessionObject (readonly)

Returns the value of attribute session.



54
55
56
# File 'lib/bunny/transport.rb', line 54

def session
  @session
end

#socketObject (readonly)

Returns the value of attribute socket.



54
55
56
# File 'lib/bunny/transport.rb', line 54

def socket
  @socket
end

#tls_ca_certificatesObject (readonly)

Returns the value of attribute tls_ca_certificates.



55
56
57
# File 'lib/bunny/transport.rb', line 55

def tls_ca_certificates
  @tls_ca_certificates
end

#tls_certificate_pathObject (readonly)

Returns the value of attribute tls_certificate_path.



55
56
57
# File 'lib/bunny/transport.rb', line 55

def tls_certificate_path
  @tls_certificate_path
end

#tls_contextObject (readonly)

Returns the value of attribute tls_context.



55
56
57
# File 'lib/bunny/transport.rb', line 55

def tls_context
  @tls_context
end

#tls_key_pathObject (readonly)

Returns the value of attribute tls_key_path.



55
56
57
# File 'lib/bunny/transport.rb', line 55

def tls_key_path
  @tls_key_path
end

#verify_peerObject (readonly)

Returns the value of attribute verify_peer.



55
56
57
# File 'lib/bunny/transport.rb', line 55

def verify_peer
  @verify_peer
end

#write_timeoutObject (readonly)

Returns the value of attribute write_timeout.



54
55
56
# File 'lib/bunny/transport.rb', line 54

def write_timeout
  @write_timeout
end

Class Method Details

.ping!(host, port, timeout) ⇒ Object

Raises:



295
296
297
# File 'lib/bunny/transport.rb', line 295

def self.ping!(host, port, timeout)
  raise ConnectionTimeout.new("#{host}:#{port} is unreachable") if !reacheable?(host, port, timeout)
end

.reacheable?(host, port, timeout) ⇒ Boolean

Returns:

  • (Boolean)


282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/bunny/transport.rb', line 282

def self.reacheable?(host, port, timeout)
  begin
    s = Bunny::SocketImpl.open(host, port,
      :connect_timeout => timeout)

    true
  rescue SocketError, Timeout::Error => _e
    false
  ensure
    s.close if s
  end
end

Instance Method Details

#close(reason = nil) ⇒ Object



222
223
224
# File 'lib/bunny/transport.rb', line 222

def close(reason = nil)
  @socket.close if open?
end

#closed?Boolean

Returns:

  • (Boolean)


230
231
232
# File 'lib/bunny/transport.rb', line 230

def closed?
  !open?
end

#configure_socket(&block) ⇒ Object



148
149
150
# File 'lib/bunny/transport.rb', line 148

def configure_socket(&block)
  block.call(@socket) if @socket
end

#configure_tls_context(&block) ⇒ Object



152
153
154
# File 'lib/bunny/transport.rb', line 152

def configure_tls_context(&block)
  block.call(@tls_context) if @tls_context
end

#connectObject



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/bunny/transport.rb', line 110

def connect
  if uses_tls?
    begin
      @socket.connect
    rescue OpenSSL::SSL::SSLError => e
      @logger.error { "TLS connection failed: #{e.message}" }
      raise e
    end

    log_peer_certificate_info(Logger::DEBUG, @socket.peer_cert)
    log_peer_certificate_chain_info(Logger::DEBUG, @socket.peer_cert_chain)

    begin
      @socket.post_connection_check(host) if @verify_peer
    rescue OpenSSL::SSL::SSLError => e
      @logger.error do
        msg = "Peer verification of target server failed: #{e.message}. "
        msg += "Target hostname: #{hostname}, see peer certificate chain details below."
        msg
      end
      log_peer_certificate_info(Logger::ERROR, @socket.peer_cert)
      log_peer_certificate_chain_info(Logger::ERROR, @socket.peer_cert_chain)

      raise e
    end

    @status = :connected

    @socket
  else
    # no-op
  end
end

#connected?Boolean

Returns:

  • (Boolean)


144
145
146
# File 'lib/bunny/transport.rb', line 144

def connected?
  :connected == @status && open?
end

#flushObject



234
235
236
# File 'lib/bunny/transport.rb', line 234

def flush
  @socket.flush if @socket
end

#hostnameObject



91
92
93
# File 'lib/bunny/transport.rb', line 91

def hostname
  @host
end

#initialize_socketObject



299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/bunny/transport.rb', line 299

def initialize_socket
  @logger.debug("Usong connection timeout of #{@connect_timeout} when connecting to #{@host}:#{@port}")
  begin
    @socket = Bunny::SocketImpl.open(@host, @port,
      :keepalive      => @opts[:keepalive],
      :connect_timeout => @connect_timeout)
  rescue StandardError, ClientTimeout => e
    @status = :not_connected
    raise Bunny::TCPConnectionFailed.new(e, self.hostname, self.port)
  end

  @socket
end

#local_addressObject



95
96
97
# File 'lib/bunny/transport.rb', line 95

def local_address
  @socket.local_address
end

#maybe_initialize_socketObject



313
314
315
# File 'lib/bunny/transport.rb', line 313

def maybe_initialize_socket
  initialize_socket if !@socket || closed?
end

#open?Boolean

Returns:

  • (Boolean)


226
227
228
# File 'lib/bunny/transport.rb', line 226

def open?
  @socket && !@socket.closed?
end

#post_initialize_socketObject



317
318
319
320
321
322
323
# File 'lib/bunny/transport.rb', line 317

def post_initialize_socket
  @socket = if uses_tls? and !@socket.is_a?(Bunny::SSLSocketImpl)
              wrap_in_tls_socket(@socket)
            else
              @socket
            end
end

#read_fully(count) ⇒ Object



238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/bunny/transport.rb', line 238

def read_fully(count)
  begin
    @socket.read_fully(count, @read_timeout)
  rescue SystemCallError, Timeout::Error, Bunny::ConnectionError, IOError => e
    @logger.error "Got an exception when receiving data: #{e.message} (#{e.class.name})"
    close
    @status = :not_connected

    if @session.automatically_recover?
      raise
    else
      @session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
    end
  end
end

#read_next_frame(opts = {}) ⇒ Object

Exposed primarily for Bunny::Channel

Raises:



261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/bunny/transport.rb', line 261

def read_next_frame(opts = {})
  header              = read_fully(7)
  type, channel, size = AMQ::Protocol::Frame.decode_header(header)
  payload             = if size > 0
                          read_fully(size)
                        else
                          ''
                        end
  frame_end = read_fully(1)

  # 1) the size is miscalculated
  if payload.bytesize != size
    raise BadLengthError.new(size, payload.bytesize)
  end

  # 2) the size is OK, but the string doesn't end with FINAL_OCTET
  raise NoFinalOctetError.new if frame_end != AMQ::Protocol::Frame::FINAL_OCTET
  AMQ::Protocol::Frame.new(type, payload, channel)
end

#read_ready?(timeout = nil) ⇒ Boolean

Returns:

  • (Boolean)


254
255
256
257
# File 'lib/bunny/transport.rb', line 254

def read_ready?(timeout = nil)
  io = IO.select([@socket].compact, nil, nil, timeout)
  io && io[0].include?(@socket)
end

#send_frame(frame) ⇒ Object

Sends frame to the peer.



201
202
203
204
205
206
207
# File 'lib/bunny/transport.rb', line 201

def send_frame(frame)
  if closed?
    @session.handle_network_failure(ConnectionClosedError.new(frame))
  else
    write(frame.encode)
  end
end

#send_frame_without_timeout(frame) ⇒ Object

Sends frame to the peer without timeout control.



213
214
215
216
217
218
219
# File 'lib/bunny/transport.rb', line 213

def send_frame_without_timeout(frame)
  if closed?
    @session.handle_network_failure(ConnectionClosedError.new(frame))
  else
    write_without_timeout(frame.encode)
  end
end

#uses_ssl?Boolean Also known as: ssl?

Returns:

  • (Boolean)


104
105
106
# File 'lib/bunny/transport.rb', line 104

def uses_ssl?
  @tls_enabled
end

#uses_tls?Boolean Also known as: tls?

Returns:

  • (Boolean)


99
100
101
# File 'lib/bunny/transport.rb', line 99

def uses_tls?
  @tls_enabled
end

#write(data) ⇒ Object

Writes data to the socket. If read/write timeout was specified the operation will return after that amount of time has elapsed waiting for the socket.



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/bunny/transport.rb', line 158

def write(data)
  return write_without_timeout(data) unless @write_timeout

  begin
    if open?
      @writes_mutex.synchronize do
        @socket.write_nonblock_fully(data, @write_timeout)
      end
    end
  rescue SystemCallError, Timeout::Error, Bunny::ConnectionError, IOError => e
    @logger.error "Got an exception when sending data: #{e.message} (#{e.class.name})"
    close
    @status = :not_connected

    if @session.automatically_recover?
      @session.handle_network_failure(e)
    else
      @session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
    end
  end
end

#write_without_timeout(data, raise_exceptions = false) ⇒ Object

Writes data to the socket without timeout checks



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/bunny/transport.rb', line 181

def write_without_timeout(data, raise_exceptions = false)
  begin
    @writes_mutex.synchronize { @socket.write(data) }
    @socket.flush
  rescue SystemCallError, Bunny::ConnectionError, IOError => e
    close
    raise e if raise_exceptions

    if @session.automatically_recover?
      @session.handle_network_failure(e)
    else
      @session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
    end
  end
end