Class: CC::Kafka::Producer::HTTP
- Inherits:
-
Object
- Object
- CC::Kafka::Producer::HTTP
- Defined in:
- lib/cc/kafka/producer/http.rb
Constant Summary collapse
- HTTPError =
Class.new(StandardError)
- HTTP_TIMEOUT =
seconds
60
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(host, port, topic, ssl = false) ⇒ HTTP
constructor
A new instance of HTTP.
- #send_message(message, key) ⇒ Object
Constructor Details
#initialize(host, port, topic, ssl = false) ⇒ HTTP
Returns a new instance of HTTP.
8 9 10 11 12 13 |
# File 'lib/cc/kafka/producer/http.rb', line 8 def initialize(host, port, topic, ssl = false) @host = host @port = port @topic = topic @ssl = ssl end |
Instance Method Details
#close ⇒ Object
39 40 41 |
# File 'lib/cc/kafka/producer/http.rb', line 39 def close # no-op end |
#send_message(message, key) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/cc/kafka/producer/http.rb', line 15 def (, key) Kafka.logger.debug("sending message over HTTP") http = Net::HTTP.new(@host, @port) http.open_timeout = HTTP_TIMEOUT http.read_timeout = HTTP_TIMEOUT if ssl? http.use_ssl = true http.verify_mode = OpenSSL::SSL::VERIFY_PEER add_ssl_certificates(http) end request = Net::HTTP::Post.new("/") request["Topic"] = @topic request["Key"] = key if key request.body = response = http.request(request) unless response.is_a?(Net::HTTPSuccess) raise HTTPError, "request not successful: (#{response.code}) #{response.body}" end end |