Class: CC::Kafka::Producer

Inherits:
Object
  • Object
show all
Defined in:
lib/cc/kafka/producer.rb,
lib/cc/kafka/producer/http.rb,
lib/cc/kafka/producer/poseidon.rb

Defined Under Namespace

Classes: HTTP, Poseidon

Constant Summary collapse

InvalidScheme =
Class.new(StandardError)

Instance Method Summary collapse

Constructor Details

#initialize(url, client_id = nil) ⇒ Producer

Returns a new instance of Producer.



10
11
12
13
14
15
# File 'lib/cc/kafka/producer.rb', line 10

def initialize(url, client_id = nil)
  @url = url
  @client_id = client_id

  Kafka.logger.debug("initialized client for #{@url} (id: #{@client_id.inspect})")
end

Instance Method Details

#closeObject



36
37
38
# File 'lib/cc/kafka/producer.rb', line 36

def close
  producer.close
end

#send_message(data, key = nil) ⇒ Object



17
18
19
20
21
22
23
24
# File 'lib/cc/kafka/producer.rb', line 17

def send_message(data, key = nil)
  Kafka.logger.debug("data: #{data.inspect}, key: #{key.inspect}")

  producer.send_message(BSON.serialize(data).to_s, key)
rescue
  producer.close
  raise
end

#send_snapshot_document(collection:, document:, snapshot_id:) ⇒ Object



26
27
28
29
30
31
32
33
34
# File 'lib/cc/kafka/producer.rb', line 26

def send_snapshot_document(collection:, document:, snapshot_id:)
  snapshot_id = BSON::ObjectId(snapshot_id) if snapshot_id.is_a?(String)
  data = {
    type: "document",
    collection: collection,
    document: document.merge(snapshot_id: snapshot_id)
  }
  send_message(data, snapshot_id.to_s)
end