Class: Kafka::Connection
- Inherits:
-
Object
- Object
- Kafka::Connection
- Defined in:
- lib/kafka/connection.rb
Overview
A connection to a single Kafka broker.
Usually you'll need a separate connection to each broker in a cluster, since most requests must be directed specifically to the broker that is currently leader for the set of topic partitions you want to produce to or consumer from.
Constant Summary collapse
- SOCKET_TIMEOUT =
5- CONNECT_TIMEOUT =
10
Instance Method Summary collapse
- #close ⇒ Object
- #connect ⇒ Object
- #connected? ⇒ Boolean
-
#initialize(host:, port:, client_id:, logger:, connect_timeout: nil, socket_timeout: nil) ⇒ Connection
constructor
Opens a connection to a Kafka broker.
-
#request(api_key, request, response_class) ⇒ Object
Sends a request over the connection.
- #to_s ⇒ Object
Constructor Details
#initialize(host:, port:, client_id:, logger:, connect_timeout: nil, socket_timeout: nil) ⇒ Connection
Opens a connection to a Kafka broker.
32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/kafka/connection.rb', line 32 def initialize(host:, port:, client_id:, logger:, connect_timeout: nil, socket_timeout: nil) @host, @port, @client_id = host, port, client_id @logger = logger @connect_timeout = connect_timeout || CONNECT_TIMEOUT @socket_timeout = socket_timeout || SOCKET_TIMEOUT @logger.info "Opening connection to #{@host}:#{@port} with client id #{@client_id}..." connect end |
Instance Method Details
#close ⇒ Object
68 69 70 71 72 73 74 |
# File 'lib/kafka/connection.rb', line 68 def close @logger.debug "Closing socket to #{to_s}" @socket.close if @socket @socket = nil end |
#connect ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/kafka/connection.rb', line 48 def connect @socket = SocketWithTimeout.new(@host, @port, connect_timeout: @connect_timeout, timeout: @socket_timeout) @encoder = Kafka::Protocol::Encoder.new(@socket) @decoder = Kafka::Protocol::Decoder.new(@socket) # Correlation id is initialized to zero and bumped for each request. @correlation_id = 0 rescue Errno::ETIMEDOUT => e @logger.error "Timed out while trying to connect to #{self}: #{e}" raise ConnectionError, e rescue SocketError, Errno::ECONNREFUSED => e @logger.error "Failed to connect to #{self}: #{e}" raise ConnectionError, e end |
#connected? ⇒ Boolean
64 65 66 |
# File 'lib/kafka/connection.rb', line 64 def connected? !@socket.nil? end |
#request(api_key, request, response_class) ⇒ Object
Sends a request over the connection.
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/kafka/connection.rb', line 83 def request(api_key, request, response_class) connect unless connected? write_request(api_key, request) unless response_class.nil? loop do correlation_id, response = read_response(response_class) # There may have been a previous request that timed out before the client # was able to read the response. In that case, the response will still be # sitting in the socket waiting to be read. If the response we just read # was to a previous request, we can safely skip it. if correlation_id < @correlation_id @logger.error "Received out-of-order response id #{correlation_id}, was expecting #{@correlation_id}" elsif correlation_id > @correlation_id raise Kafka::Error, "Correlation id mismatch: expected #{@correlation_id} but got #{correlation_id}" else break response end end end rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ETIMEDOUT, EOFError => e @logger.error "Connection error: #{e}" close raise ConnectionError, "Connection error: #{e}" end |
#to_s ⇒ Object
44 45 46 |
# File 'lib/kafka/connection.rb', line 44 def to_s "#{@host}:#{@port}" end |