Class: RabbitMQ::Client::Connection Private

Inherits:
Object
  • Object
show all
Defined in:
lib/rabbitmq/client/connection.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Represents a connection to a single RabbitMQ server. Used internally by the RabbitMQ::Client class.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Connection

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of Connection.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/rabbitmq/client/connection.rb', line 17

def initialize(*args)
  @ptr = FFI.amqp_new_connection
  @frame = FFI::Frame.new
  
  create_socket!
  
  info = Util.connection_info(*args)
  @options = {
    ssl:                   (info.fetch(:ssl) ? true : false),
    host:                   info.fetch(:host).to_s,
    port:           Integer(info.fetch(:port)),
    user:                   info.fetch(:user).to_s,
    password:               info.fetch(:password).to_s,
    vhost:                  info.fetch(:vhost).to_s,
    max_channels:   Integer(info.fetch(:max_channels, FFI::CHANNEL_MAX_ID)),
    max_frame_size: Integer(info.fetch(:max_frame_size, 131072)),
    heartbeat_interval: 0, # not fully implemented in librabbitmq
  }
  
  @finalizer = self.class.create_finalizer_for(@ptr)
  ObjectSpace.define_finalizer(self, @finalizer)
end

Instance Attribute Details

#optionsObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



15
16
17
# File 'lib/rabbitmq/client/connection.rb', line 15

def options
  @options
end

#ptrObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Raises:



14
15
16
# File 'lib/rabbitmq/client/connection.rb', line 14

def ptr
  @ptr
end

Class Method Details

.create_finalizer_for(ptr) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



49
50
51
52
53
54
# File 'lib/rabbitmq/client/connection.rb', line 49

def self.create_finalizer_for(ptr)
  Proc.new do
    FFI.amqp_connection_close(ptr, 200)
    FFI.amqp_destroy_connection(ptr)
  end
end

Instance Method Details

#closeObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Raises:



66
67
68
69
# File 'lib/rabbitmq/client/connection.rb', line 66

def close
  raise DestroyedError unless @ptr
  FFI.amqp_connection_close(@ptr, 200)
end

#connect_socket!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Raises:



78
79
80
81
82
83
84
85
86
87
88
# File 'lib/rabbitmq/client/connection.rb', line 78

def connect_socket!
  raise DestroyedError unless @ptr
  raise NotImplementedError if @options[:ssl]
  
  create_socket!
  Util.error_check :"opening a socket",
    FFI.amqp_socket_open(@socket, @options[:host], @options[:port])
  
  @ruby_socket = Socket.for_fd(FFI.amqp_get_sockfd(@ptr))
  @ruby_socket.autoclose = false
end

#create_socket!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Raises:



71
72
73
74
75
76
# File 'lib/rabbitmq/client/connection.rb', line 71

def create_socket!
  raise DestroyedError unless @ptr
  
  @socket = FFI.amqp_tcp_socket_new(@ptr)
  Util.null_check :"creating a socket", @socket
end

#destroyObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



40
41
42
43
44
45
46
# File 'lib/rabbitmq/client/connection.rb', line 40

def destroy
  if @finalizer
    @finalizer.call
    ObjectSpace.undefine_finalizer(self)
  end
  @ptr = @socket = @finalizer = nil
end

#fetch_next_event(timeout = 0, start = Time.now) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Fetch the next one or more frames to form the next discrete event, returning the event as a Hash, or nil if time expired.



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/rabbitmq/client/connection.rb', line 143

def fetch_next_event(timeout=0, start=Time.now)
  garbage_collect
  
  frame = fetch_next_frame(timeout, start)
  return unless frame
  event = frame.as_method_to_h(false)
  return event unless FFI::Method.has_content?(event.fetch(:method))
  
  frame = fetch_next_frame(timeout, start)
  return unless frame
  event.merge!(frame.as_header_to_h)
  
  body = "".force_encoding(Encoding::ASCII_8BIT)
  while body.size < event.fetch(:body_size)
    frame = fetch_next_frame(timeout, start)
    return unless frame
    body.concat frame.as_body_to_s
  end
  
  event[:body] = body
  event
end

#fetch_next_frame(timeout = 0, start = Time.now) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Return the next available frame, or nil if time expired.



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/rabbitmq/client/connection.rb', line 122

def fetch_next_frame(timeout=0, start=Time.now)
  frame = @frame
  
  # Try fetching the next frame without a blocking call.
  status = FFI.amqp_simple_wait_frame_noblock(@ptr, frame, FFI::Timeval.zero)
  case status
  when :ok;      return frame
  when :timeout; # do nothing and proceed to waiting on select below
  else Util.error_check :"fetching the next frame", status
  end
  
  # Otherwise, wait for the socket to be readable and try fetching again.
  return nil unless select(timeout, start)
  Util.error_check :"fetching the next frame",
    FFI.amqp_simple_wait_frame(@ptr, frame)
  
  frame
end

#garbage_collectObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



174
175
176
# File 'lib/rabbitmq/client/connection.rb', line 174

def garbage_collect
  FFI.amqp_maybe_release_buffers(ptr)
end

#garbage_collect_channel(channel_id) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



178
179
180
# File 'lib/rabbitmq/client/connection.rb', line 178

def garbage_collect_channel(channel_id)
  FFI.amqp_maybe_release_buffers_on_channel(ptr, channel_id)
end

#login!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Raises:



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/rabbitmq/client/connection.rb', line 90

def login!
  raise DestroyedError unless @ptr
  
  res = FFI.(@ptr, @options[:vhost],
    @options[:max_channels], @options[:max_frame_size],
    @options[:heartbeat_interval], :plain,
    :string, @options[:user], :string, @options[:password])
  
  case res[:reply_type]
  when :library_exception; Util.error_check :"logging in", res[:library_error]
  when :server_exception;  raise NotImplementedError
  end
  
  @server_properties = FFI::Table.new(FFI.amqp_get_server_properties(@ptr)).to_h
end

#remaining_timeout(timeout = 0, start = Time.now) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Calculate the amount of the timeout remaining from the given start time



107
108
109
110
111
# File 'lib/rabbitmq/client/connection.rb', line 107

def remaining_timeout(timeout=0, start=Time.now)
  return nil unless timeout
  timeout = timeout - (Time.now - start)
  timeout < 0 ? 0 : timeout
end

#select(timeout = 0, start = Time.now) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Block until there is readable data on the internal ruby socket, returning true if there is readable data, or false if time expired.



115
116
117
118
119
# File 'lib/rabbitmq/client/connection.rb', line 115

def select(timeout=0, start=Time.now)
  IO.select([@ruby_socket], [], [],
    remaining_timeout(timeout, start)
  ) ? true : false
end

#send_method(channel_id, method, properties = {}) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



166
167
168
169
170
171
172
# File 'lib/rabbitmq/client/connection.rb', line 166

def send_method(channel_id, method, properties={})
  req    = FFI::Method.lookup_class(method).new.apply(properties)
  status = FFI.amqp_send_method(ptr, channel_id, method, req.pointer)
  
  req.free!
  status
end

#startObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



61
62
63
64
# File 'lib/rabbitmq/client/connection.rb', line 61

def start
  connect_socket!
  login!
end