Class: Kafka::Connection
- Inherits:
-
Object
- Object
- Kafka::Connection
- Defined in:
- lib/kafka/connection.rb
Overview
A connection to a single Kafka broker.
Usually you'll need a separate connection to each broker in a cluster, since most requests must be directed specifically to the broker that is currently leader for the set of topic partitions you want to produce to or consume from.
Instrumentation
Connections emit a request.connection.kafka
notification on each request. The following
keys will be found in the payload:
:api
— the name of the API being invoked.:request_size
— the number of bytes in the request.:response_size
— the number of bytes in the response.
The notification also includes the duration of the request.
Constant Summary collapse
- SOCKET_TIMEOUT =
10
- CONNECT_TIMEOUT =
10
- IDLE_TIMEOUT =
Time after which an idle connection will be reopened.
60 * 5
Instance Attribute Summary collapse
-
#decoder ⇒ Object
readonly
Returns the value of attribute decoder.
-
#encoder ⇒ Object
readonly
Returns the value of attribute encoder.
Instance Method Summary collapse
- #address_match?(host, port) ⇒ Boolean
- #close ⇒ Object
-
#initialize(host:, port:, client_id:, logger:, instrumenter:, sasl_authenticator:, connect_timeout: nil, socket_timeout: nil, ssl_context: nil) ⇒ Connection
constructor
Opens a connection to a Kafka broker.
- #open? ⇒ Boolean
-
#send_request(request) ⇒ Object
Sends a request over the connection.
- #to_s ⇒ Object
Constructor Details
#initialize(host:, port:, client_id:, logger:, instrumenter:, sasl_authenticator:, connect_timeout: nil, socket_timeout: nil, ssl_context: nil) ⇒ Connection
Opens a connection to a Kafka broker.
51 52 53 54 55 56 57 58 59 60 |
# File 'lib/kafka/connection.rb', line 51 def initialize(host:, port:, client_id:, logger:, instrumenter:, sasl_authenticator:, connect_timeout: nil, socket_timeout: nil, ssl_context: nil) @host, @port, @client_id = host, port, client_id @logger = logger @instrumenter = instrumenter @connect_timeout = connect_timeout || CONNECT_TIMEOUT @socket_timeout = socket_timeout || SOCKET_TIMEOUT @ssl_context = ssl_context @sasl_authenticator = sasl_authenticator end |
Instance Attribute Details
#decoder ⇒ Object (readonly)
Returns the value of attribute decoder.
35 36 37 |
# File 'lib/kafka/connection.rb', line 35 def decoder @decoder end |
#encoder ⇒ Object (readonly)
Returns the value of attribute encoder.
34 35 36 |
# File 'lib/kafka/connection.rb', line 34 def encoder @encoder end |
Instance Method Details
#address_match?(host, port) ⇒ Boolean
62 63 64 |
# File 'lib/kafka/connection.rb', line 62 def address_match?(host, port) @host == host && @port == port end |
#close ⇒ Object
74 75 76 77 78 79 80 |
# File 'lib/kafka/connection.rb', line 74 def close @logger.debug "Closing socket to #{to_s}" @socket.close if @socket @socket = nil end |
#open? ⇒ Boolean
70 71 72 |
# File 'lib/kafka/connection.rb', line 70 def open? !@socket.nil? && !@socket.closed? end |
#send_request(request) ⇒ Object
Sends a request over the connection.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/kafka/connection.rb', line 88 def send_request(request) # Default notification payload. notification = { broker_host: @host, api: Protocol.api_name(request.api_key), request_size: 0, response_size: 0, } @instrumenter.instrument("request.connection", notification) do open unless open? reopen if idle? @correlation_id += 1 write_request(request, notification) response_class = request.response_class response = wait_for_response(response_class, notification) unless response_class.nil? @last_request = Time.now response end rescue SystemCallError, EOFError => e close raise ConnectionError, "Connection error #{e.class}: #{e}" end |
#to_s ⇒ Object
66 67 68 |
# File 'lib/kafka/connection.rb', line 66 def to_s "#{@host}:#{@port}" end |