Class: Kafka::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/connection.rb

Overview

A connection to a single Kafka broker.

Usually you'll need a separate connection to each broker in a cluster, since most requests must be directed specifically to the broker that is currently leader for the set of topic partitions you want to produce to or consumer from.

Constant Summary collapse

SOCKET_TIMEOUT =
5
CONNECT_TIMEOUT =
10

Instance Method Summary collapse

Constructor Details

#initialize(host:, port:, client_id:, logger:, connect_timeout: nil, socket_timeout: nil) ⇒ Connection

Opens a connection to a Kafka broker.

Parameters:

  • host (String)

    the hostname of the broker.

  • port (Integer)

    the port of the broker.

  • client_id (String)

    the client id is a user-specified string sent in each request to help trace calls and should logically identify the application making the request.

  • logger (Logger)

    the logger used to log trace messages.

  • connect_timeout (Integer) (defaults to: nil)

    the socket timeout for connecting to the broker. Default is 10 seconds.

  • socket_timeout (Integer) (defaults to: nil)

    the socket timeout for reading and writing to the broker. Default is 5 seconds.



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/kafka/connection.rb', line 32

def initialize(host:, port:, client_id:, logger:, connect_timeout: nil, socket_timeout: nil)
  @host, @port, @client_id = host, port, client_id
  @logger = logger

  @connect_timeout = connect_timeout || CONNECT_TIMEOUT
  @socket_timeout = socket_timeout || SOCKET_TIMEOUT

  @logger.info "Opening connection to #{@host}:#{@port} with client id #{@client_id}..."

  connect
end

Instance Method Details

#closeObject



68
69
70
71
72
73
74
# File 'lib/kafka/connection.rb', line 68

def close
  @logger.debug "Closing socket to #{to_s}"

  @socket.close if @socket

  @socket = nil
end

#connectObject



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/kafka/connection.rb', line 48

def connect
  @socket = SocketWithTimeout.new(@host, @port, connect_timeout: @connect_timeout, timeout: @socket_timeout)

  @encoder = Kafka::Protocol::Encoder.new(@socket)
  @decoder = Kafka::Protocol::Decoder.new(@socket)

  # Correlation id is initialized to zero and bumped for each request.
  @correlation_id = 0
rescue Errno::ETIMEDOUT => e
  @logger.error "Timed out while trying to connect to #{self}: #{e}"
  raise ConnectionError, e
rescue SocketError, Errno::ECONNREFUSED => e
  @logger.error "Failed to connect to #{self}: #{e}"
  raise ConnectionError, e
end

#connected?Boolean

Returns:

  • (Boolean)


64
65
66
# File 'lib/kafka/connection.rb', line 64

def connected?
  !@socket.nil?
end

#request(api_key, request, response_class) ⇒ Object

Sends a request over the connection.

Parameters:

  • api_key (Integer)

    the integer code for the API that is invoked.

  • request (#encode)

    the request that should be encoded and written.

  • response_class (#decode)

    an object that can decode the response.

Returns:

  • (Object)

    the response that was decoded by response_class.



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/kafka/connection.rb', line 83

def request(api_key, request, response_class)
  connect unless connected?

  write_request(api_key, request)

  unless response_class.nil?
    loop do
      correlation_id, response = read_response(response_class)

      # There may have been a previous request that timed out before the client
      # was able to read the response. In that case, the response will still be
      # sitting in the socket waiting to be read. If the response we just read
      # was to a previous request, we can safely skip it.
      if correlation_id < @correlation_id
        @logger.error "Received out-of-order response id #{correlation_id}, was expecting #{@correlation_id}"
      elsif correlation_id > @correlation_id
        raise Kafka::Error, "Correlation id mismatch: expected #{@correlation_id} but got #{correlation_id}"
      else
        break response
      end
    end
  end
rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ETIMEDOUT, EOFError => e
  @logger.error "Connection error: #{e}"

  close

  raise ConnectionError, "Connection error: #{e}"
end

#to_sObject



44
45
46
# File 'lib/kafka/connection.rb', line 44

def to_s
  "#{@host}:#{@port}"
end