Class: Kafka::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/connection.rb

Overview

A connection to a single Kafka broker.

Usually you'll need a separate connection to each broker in a cluster, since most requests must be directed specifically to the broker that is currently leader for the set of topic partitions you want to produce to or consume from.

Instrumentation

Connections emit a request.connection.kafka notification on each request. The following keys will be found in the payload:

  • :api — the name of the API being invoked.
  • :request_size — the number of bytes in the request.
  • :response_size — the number of bytes in the response.

The notification also includes the duration of the request.

Constant Summary collapse

SOCKET_TIMEOUT =
10
CONNECT_TIMEOUT =
10
IDLE_TIMEOUT =

Time after which an idle connection will be reopened.

60 * 5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host:, port:, client_id:, logger:, instrumenter:, connect_timeout: nil, socket_timeout: nil, ssl_context: nil) ⇒ Connection

Opens a connection to a Kafka broker.


53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/kafka/connection.rb', line 53

def initialize(host:, port:, client_id:, logger:, instrumenter:, connect_timeout: nil, socket_timeout: nil, ssl_context: nil)
  @host, @port, @client_id = host, port, client_id
  @logger = TaggedLogger.new(logger)
  @instrumenter = instrumenter

  @connect_timeout = connect_timeout || CONNECT_TIMEOUT
  @socket_timeout = socket_timeout || SOCKET_TIMEOUT
  @ssl_context = ssl_context

  @socket = nil
  @last_request = nil
end

Instance Attribute Details

#decoderObject (readonly)

Returns the value of attribute decoder


37
38
39
# File 'lib/kafka/connection.rb', line 37

def decoder
  @decoder
end

#encoderObject (readonly)

Returns the value of attribute encoder


36
37
38
# File 'lib/kafka/connection.rb', line 36

def encoder
  @encoder
end

Instance Method Details

#closeObject


74
75
76
77
78
# File 'lib/kafka/connection.rb', line 74

def close
  @logger.debug "Closing socket to #{to_s}"

  @socket.close if @socket
end

#open?Boolean


70
71
72
# File 'lib/kafka/connection.rb', line 70

def open?
  !@socket.nil? && !@socket.closed?
end

#send_request(request) ⇒ Object

Sends a request over the connection.


86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/kafka/connection.rb', line 86

def send_request(request)
  api_name = Protocol.api_name(request.api_key)

  # Default notification payload.
  notification = {
    broker_host: @host,
    api: api_name,
    request_size: 0,
    response_size: 0,
  }

  raise IdleConnection if idle?

  @logger.push_tags(api_name)
  @instrumenter.instrument("request.connection", notification) do
    open unless open?

    @correlation_id += 1

    @logger.debug "Sending #{api_name} API request #{@correlation_id} to #{to_s}"

    write_request(request, notification)

    response_class = request.response_class
    response = wait_for_response(response_class, notification) unless response_class.nil?

    @last_request = Time.now

    response
  end
rescue SystemCallError, EOFError, IOError => e
  close

  raise ConnectionError, "Connection error #{e.class}: #{e}"
ensure
  @logger.pop_tags
end

#to_sObject


66
67
68
# File 'lib/kafka/connection.rb', line 66

def to_s
  "#{@host}:#{@port}"
end