Class: RabbitMQ::Client::Connection Private
- Inherits:
-
Object
- Object
- RabbitMQ::Client::Connection
- Defined in:
- lib/rabbitmq/client/connection.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Represents a connection to a single RabbitMQ server. Used internally by the RabbitMQ::Client class.
Instance Attribute Summary collapse
- #options ⇒ Object readonly private
- #ptr ⇒ Object readonly private
Class Method Summary collapse
Instance Method Summary collapse
- #close ⇒ Object private
- #connect_socket! ⇒ Object private
- #create_socket! ⇒ Object private
- #destroy ⇒ Object private
-
#fetch_next_event(timeout = 0, start = Time.now) ⇒ Object
private
Fetch the next one or more frames to form the next discrete event, returning the event as a Hash, or nil if time expired.
-
#fetch_next_frame(timeout = 0, start = Time.now) ⇒ Object
private
Return the next available frame, or nil if time expired.
- #garbage_collect ⇒ Object private
- #garbage_collect_channel(channel_id) ⇒ Object private
-
#initialize(*args) ⇒ Connection
constructor
private
A new instance of Connection.
- #login! ⇒ Object private
-
#remaining_timeout(timeout = 0, start = Time.now) ⇒ Object
private
Calculate the amount of the timeout remaining from the given start time.
-
#select(timeout = 0, start = Time.now) ⇒ Object
private
Block until there is readable data on the internal ruby socket, returning true if there is readable data, or false if time expired.
- #send_method(channel_id, method, properties = {}) ⇒ Object private
- #start ⇒ Object private
Constructor Details
#initialize(*args) ⇒ Connection
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of Connection.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/rabbitmq/client/connection.rb', line 17 def initialize(*args) @ptr = FFI.amqp_new_connection @frame = FFI::Frame.new create_socket! info = Util.connection_info(*args) @options = { ssl: (info.fetch(:ssl) ? true : false), host: info.fetch(:host).to_s, port: Integer(info.fetch(:port)), user: info.fetch(:user).to_s, password: info.fetch(:password).to_s, vhost: info.fetch(:vhost).to_s, max_channels: Integer(info.fetch(:max_channels, FFI::CHANNEL_MAX_ID)), max_frame_size: Integer(info.fetch(:max_frame_size, 131072)), heartbeat_interval: 0, # not fully implemented in librabbitmq } @finalizer = self.class.create_finalizer_for(@ptr) ObjectSpace.define_finalizer(self, @finalizer) end |
Instance Attribute Details
#options ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
15 16 17 |
# File 'lib/rabbitmq/client/connection.rb', line 15 def @options end |
#ptr ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
14 15 16 |
# File 'lib/rabbitmq/client/connection.rb', line 14 def ptr @ptr end |
Class Method Details
.create_finalizer_for(ptr) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
49 50 51 52 53 54 |
# File 'lib/rabbitmq/client/connection.rb', line 49 def self.create_finalizer_for(ptr) Proc.new do FFI.amqp_connection_close(ptr, 200) FFI.amqp_destroy_connection(ptr) end end |
Instance Method Details
#close ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
66 67 68 69 |
# File 'lib/rabbitmq/client/connection.rb', line 66 def close raise DestroyedError unless @ptr FFI.amqp_connection_close(@ptr, 200) end |
#connect_socket! ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/rabbitmq/client/connection.rb', line 78 def connect_socket! raise DestroyedError unless @ptr raise NotImplementedError if @options[:ssl] create_socket! Util.error_check :"opening a socket", FFI.amqp_socket_open(@socket, @options[:host], @options[:port]) @ruby_socket = Socket.for_fd(FFI.amqp_get_sockfd(@ptr)) @ruby_socket.autoclose = false end |
#create_socket! ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
71 72 73 74 75 76 |
# File 'lib/rabbitmq/client/connection.rb', line 71 def create_socket! raise DestroyedError unless @ptr @socket = FFI.amqp_tcp_socket_new(@ptr) Util.null_check :"creating a socket", @socket end |
#destroy ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
40 41 42 43 44 45 46 |
# File 'lib/rabbitmq/client/connection.rb', line 40 def destroy if @finalizer @finalizer.call ObjectSpace.undefine_finalizer(self) end @ptr = @socket = @finalizer = nil end |
#fetch_next_event(timeout = 0, start = Time.now) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Fetch the next one or more frames to form the next discrete event, returning the event as a Hash, or nil if time expired.
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/rabbitmq/client/connection.rb', line 143 def fetch_next_event(timeout=0, start=Time.now) garbage_collect frame = fetch_next_frame(timeout, start) return unless frame event = frame.as_method_to_h(false) return event unless FFI::Method.has_content?(event.fetch(:method)) frame = fetch_next_frame(timeout, start) return unless frame event.merge!(frame.as_header_to_h) body = "".force_encoding(Encoding::ASCII_8BIT) while body.size < event.fetch(:body_size) frame = fetch_next_frame(timeout, start) return unless frame body.concat frame.as_body_to_s end event[:body] = body event end |
#fetch_next_frame(timeout = 0, start = Time.now) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Return the next available frame, or nil if time expired.
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/rabbitmq/client/connection.rb', line 122 def fetch_next_frame(timeout=0, start=Time.now) frame = @frame # Try fetching the next frame without a blocking call. status = FFI.amqp_simple_wait_frame_noblock(@ptr, frame, FFI::Timeval.zero) case status when :ok; return frame when :timeout; # do nothing and proceed to waiting on select below else Util.error_check :"fetching the next frame", status end # Otherwise, wait for the socket to be readable and try fetching again. return nil unless select(timeout, start) Util.error_check :"fetching the next frame", FFI.amqp_simple_wait_frame(@ptr, frame) frame end |
#garbage_collect ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
174 175 176 |
# File 'lib/rabbitmq/client/connection.rb', line 174 def garbage_collect FFI.amqp_maybe_release_buffers(ptr) end |
#garbage_collect_channel(channel_id) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
178 179 180 |
# File 'lib/rabbitmq/client/connection.rb', line 178 def garbage_collect_channel(channel_id) FFI.amqp_maybe_release_buffers_on_channel(ptr, channel_id) end |
#login! ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/rabbitmq/client/connection.rb', line 90 def login! raise DestroyedError unless @ptr res = FFI.amqp_login(@ptr, @options[:vhost], @options[:max_channels], @options[:max_frame_size], @options[:heartbeat_interval], :plain, :string, @options[:user], :string, @options[:password]) case res[:reply_type] when :library_exception; Util.error_check :"logging in", res[:library_error] when :server_exception; raise NotImplementedError end @server_properties = FFI::Table.new(FFI.amqp_get_server_properties(@ptr)).to_h end |
#remaining_timeout(timeout = 0, start = Time.now) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Calculate the amount of the timeout remaining from the given start time
107 108 109 110 111 |
# File 'lib/rabbitmq/client/connection.rb', line 107 def remaining_timeout(timeout=0, start=Time.now) return nil unless timeout timeout = timeout - (Time.now - start) timeout < 0 ? 0 : timeout end |
#select(timeout = 0, start = Time.now) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Block until there is readable data on the internal ruby socket, returning true if there is readable data, or false if time expired.
115 116 117 118 119 |
# File 'lib/rabbitmq/client/connection.rb', line 115 def select(timeout=0, start=Time.now) IO.select([@ruby_socket], [], [], remaining_timeout(timeout, start) ) ? true : false end |
#send_method(channel_id, method, properties = {}) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
166 167 168 169 170 171 172 |
# File 'lib/rabbitmq/client/connection.rb', line 166 def send_method(channel_id, method, properties={}) req = FFI::Method.lookup_class(method).new.apply(properties) status = FFI.amqp_send_method(ptr, channel_id, method, req.pointer) req.free! status end |
#start ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
61 62 63 64 |
# File 'lib/rabbitmq/client/connection.rb', line 61 def start connect_socket! login! end |