Class: Kafka::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/connection.rb

Overview

A connection to a single Kafka broker.

Usually you'll need a separate connection to each broker in a cluster, since most requests must be directed specifically to the broker that is currently leader for the set of topic partitions you want to produce to or consume from.

Instrumentation

Connections emit a request.connection.kafka notification on each request. The following keys will be found in the payload:

  • :api — the name of the API being invoked.
  • :request_size — the number of bytes in the request.
  • :response_size — the number of bytes in the response.

The notification also includes the duration of the request.

Constant Summary collapse

SOCKET_TIMEOUT =
10
CONNECT_TIMEOUT =
10

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host:, port:, client_id:, logger:, instrumenter:, connect_timeout: nil, socket_timeout: nil, ssl_context: nil) ⇒ Connection

Opens a connection to a Kafka broker.

Parameters:

  • host (String)

    the hostname of the broker.

  • port (Integer)

    the port of the broker.

  • client_id (String)

    the client id is a user-specified string sent in each request to help trace calls and should logically identify the application making the request.

  • logger (Logger)

    the logger used to log trace messages.

  • connect_timeout (Integer) (defaults to: nil)

    the socket timeout for connecting to the broker. Default is 10 seconds.

  • socket_timeout (Integer) (defaults to: nil)

    the socket timeout for reading and writing to the broker. Default is 10 seconds.



48
49
50
51
52
53
54
55
56
# File 'lib/kafka/connection.rb', line 48

def initialize(host:, port:, client_id:, logger:, instrumenter:, connect_timeout: nil, socket_timeout: nil, ssl_context: nil)
  @host, @port, @client_id = host, port, client_id
  @logger = logger
  @instrumenter = instrumenter

  @connect_timeout = connect_timeout || CONNECT_TIMEOUT
  @socket_timeout = socket_timeout || SOCKET_TIMEOUT
  @ssl_context = ssl_context
end

Instance Attribute Details

#decoderObject (readonly)

Returns the value of attribute decoder.



32
33
34
# File 'lib/kafka/connection.rb', line 32

def decoder
  @decoder
end

#encoderObject (readonly)

Returns the value of attribute encoder.



31
32
33
# File 'lib/kafka/connection.rb', line 31

def encoder
  @encoder
end

Instance Method Details

#closeObject



66
67
68
69
70
71
72
# File 'lib/kafka/connection.rb', line 66

def close
  @logger.debug "Closing socket to #{to_s}"

  @socket.close if @socket

  @socket = nil
end

#open?Boolean

Returns:

  • (Boolean)


62
63
64
# File 'lib/kafka/connection.rb', line 62

def open?
  !@socket.nil? && !@socket.closed?
end

#send_request(request) ⇒ Object

Sends a request over the connection.

Parameters:

  • request (#encode, #response_class)

    the request that should be encoded and written.

Returns:

  • (Object)

    the response.



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/kafka/connection.rb', line 80

def send_request(request)
  # Default notification payload.
  notification = {
    broker_host: @host,
    api: Protocol.api_name(request.api_key),
    request_size: 0,
    response_size: 0,
  }

  @instrumenter.instrument("request.connection", notification) do
    open unless open?

    @correlation_id += 1

    write_request(request, notification)

    response_class = request.response_class
    wait_for_response(response_class, notification) unless response_class.nil?
  end
rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ETIMEDOUT, EOFError => e
  close

  raise ConnectionError, "Connection error: #{e}"
end

#to_sObject



58
59
60
# File 'lib/kafka/connection.rb', line 58

def to_s
  "#{@host}:#{@port}"
end