Class: CC::Kafka::Producer
- Inherits:
-
Object
- Object
- CC::Kafka::Producer
- Defined in:
- lib/cc/kafka/producer.rb,
lib/cc/kafka/producer/http.rb,
lib/cc/kafka/producer/poseidon.rb
Defined Under Namespace
Constant Summary collapse
- InvalidScheme =
Class.new(StandardError)
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(url, client_id = nil) ⇒ Producer
constructor
A new instance of Producer.
- #send_message(data, key = nil) ⇒ Object
- #send_snapshot_document(collection:, document:, snapshot_id:) ⇒ Object
Constructor Details
Instance Method Details
#close ⇒ Object
36 37 38 |
# File 'lib/cc/kafka/producer.rb', line 36 def close producer.close end |
#send_message(data, key = nil) ⇒ Object
17 18 19 20 21 22 23 24 |
# File 'lib/cc/kafka/producer.rb', line 17 def (data, key = nil) Kafka.logger.debug("data: #{data.inspect}, key: #{key.inspect}") producer.(BSON.serialize(data).to_s, key) rescue producer.close raise end |
#send_snapshot_document(collection:, document:, snapshot_id:) ⇒ Object
26 27 28 29 30 31 32 33 34 |
# File 'lib/cc/kafka/producer.rb', line 26 def send_snapshot_document(collection:, document:, snapshot_id:) snapshot_id = BSON::ObjectId(snapshot_id) if snapshot_id.is_a?(String) data = { type: "document", collection: collection, document: document.merge(snapshot_id: snapshot_id) } (data, snapshot_id.to_s) end |