Class: Kafka::Connection
- Inherits:
-
Object
- Object
- Kafka::Connection
- Defined in:
- lib/kafka/connection.rb
Overview
A connection to a single Kafka broker.
Usually you'll need a separate connection to each broker in a cluster, since most requests must be directed specifically to the broker that is currently leader for the set of topic partitions you want to produce to or consumer from.
Instrumentation
Connections emit a request.connection.kafka
notification on each request. The following
keys will be found in the payload:
:api
— the name of the API being invoked.:request_size
— the number of bytes in the request.:response_size
— the number of bytes in the response.
The notification also includes the duration of the request.
Constant Summary collapse
- SOCKET_TIMEOUT =
10
- CONNECT_TIMEOUT =
10
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(host:, port:, client_id:, logger:, instrumenter:, connect_timeout: nil, socket_timeout: nil, ssl_context: nil) ⇒ Connection
constructor
Opens a connection to a Kafka broker.
- #open? ⇒ Boolean
-
#send_request(request) ⇒ Object
Sends a request over the connection.
- #to_s ⇒ Object
Constructor Details
#initialize(host:, port:, client_id:, logger:, instrumenter:, connect_timeout: nil, socket_timeout: nil, ssl_context: nil) ⇒ Connection
Opens a connection to a Kafka broker.
45 46 47 48 49 50 51 52 53 |
# File 'lib/kafka/connection.rb', line 45 def initialize(host:, port:, client_id:, logger:, instrumenter:, connect_timeout: nil, socket_timeout: nil, ssl_context: nil) @host, @port, @client_id = host, port, client_id @logger = logger @instrumenter = instrumenter @connect_timeout = connect_timeout || CONNECT_TIMEOUT @socket_timeout = socket_timeout || SOCKET_TIMEOUT @ssl_context = ssl_context end |
Instance Method Details
#close ⇒ Object
63 64 65 66 67 68 69 |
# File 'lib/kafka/connection.rb', line 63 def close @logger.debug "Closing socket to #{to_s}" @socket.close if @socket @socket = nil end |
#open? ⇒ Boolean
59 60 61 |
# File 'lib/kafka/connection.rb', line 59 def open? !@socket.nil? end |
#send_request(request) ⇒ Object
Sends a request over the connection.
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/kafka/connection.rb', line 77 def send_request(request) # Default notification payload. notification = { broker_host: @host, api: Protocol.api_name(request.api_key), request_size: 0, response_size: 0, } @instrumenter.instrument("request.connection", notification) do open unless open? @correlation_id += 1 write_request(request, notification) response_class = request.response_class wait_for_response(response_class, notification) unless response_class.nil? end rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ETIMEDOUT, EOFError => e close raise ConnectionError, "Connection error: #{e}" end |
#to_s ⇒ Object
55 56 57 |
# File 'lib/kafka/connection.rb', line 55 def to_s "#{@host}:#{@port}" end |