Class: CC::Kafka::Producer::HTTP

Inherits:
Object
  • Object
show all
Defined in:
lib/cc/kafka/producer/http.rb

Constant Summary collapse

HTTPError =
Class.new(StandardError)
HTTP_TIMEOUT =

seconds

60

Instance Method Summary collapse

Constructor Details

#initialize(host, port, topic, ssl = false) ⇒ HTTP

Returns a new instance of HTTP.



8
9
10
11
12
13
# File 'lib/cc/kafka/producer/http.rb', line 8

def initialize(host, port, topic, ssl = false)
  @host = host
  @port = port
  @topic = topic
  @ssl = ssl
end

Instance Method Details

#closeObject



39
40
41
# File 'lib/cc/kafka/producer/http.rb', line 39

def close
  # no-op
end

#send_message(message, key) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/cc/kafka/producer/http.rb', line 15

def send_message(message, key)
  Kafka.logger.debug("sending message over HTTP")
  http = Net::HTTP.new(@host, @port)
  http.open_timeout = HTTP_TIMEOUT
  http.read_timeout = HTTP_TIMEOUT

  if ssl?
    http.use_ssl = true
    http.verify_mode = OpenSSL::SSL::VERIFY_PEER
    add_ssl_certificates(http)
  end

  request = Net::HTTP::Post.new("/")
  request["Topic"] = @topic
  request["Key"] = key if key
  request.body = message

  response = http.request(request)

  unless response.is_a?(Net::HTTPSuccess)
    raise HTTPError, "request not successful: (#{response.code}) #{response.body}"
  end
end