Class: Rdkafka::Producer
- Inherits:
-
Object
- Object
- Rdkafka::Producer
- Defined in:
- lib/rdkafka/producer.rb,
lib/rdkafka/producer/delivery_handle.rb,
lib/rdkafka/producer/delivery_report.rb
Overview
Defined Under Namespace
Classes: DeliveryHandle, DeliveryReport
Instance Method Summary collapse
-
#close ⇒ Object
Close this producer and wait for the internal poll queue to empty.
-
#produce(topic:, payload: nil, key: nil, partition: nil, timestamp: nil) ⇒ DeliveryHandle
Produces a message to a Kafka topic.
Instance Method Details
#close ⇒ Object
Close this producer and wait for the internal poll queue to empty.
22 23 24 25 26 27 |
# File 'lib/rdkafka/producer.rb', line 22 def close # Indicate to polling thread that we're closing @closing = true # Wait for the polling thread to finish up @polling_thread.join end |
#produce(topic:, payload: nil, key: nil, partition: nil, timestamp: nil) ⇒ DeliveryHandle
Produces a message to a Kafka topic. The message is added to rdkafka's queue, call wait on the returned delivery handle to make sure it is delivered.
When no partition is specified the underlying Kafka library picks a partition based on the key. If no key is specified, a random partition will be used. When a timestamp is provided this is used instead of the autogenerated timestamp.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/rdkafka/producer.rb', line 43 def produce(topic:, payload: nil, key: nil, partition: nil, timestamp: nil) # Start by checking and converting the input # Get payload length payload_size = if payload.nil? 0 else payload.bytesize end # Get key length key_size = if key.nil? 0 else key.bytesize end # If partition is nil use -1 to let Kafka set the partition based # on the key/randomly if there is no key partition = -1 if partition.nil? # If timestamp is nil use 0 and let Kafka set one = 0 if .nil? delivery_handle = DeliveryHandle.new delivery_handle[:pending] = true delivery_handle[:response] = -1 delivery_handle[:partition] = -1 delivery_handle[:offset] = -1 DeliveryHandle.register(delivery_handle.to_ptr.address, delivery_handle) # Produce the message response = Rdkafka::Bindings.rd_kafka_producev( @native_kafka, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TOPIC, :string, topic, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_MSGFLAGS, :int, Rdkafka::Bindings::RD_KAFKA_MSG_F_COPY, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_VALUE, :buffer_in, payload, :size_t, payload_size, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_KEY, :buffer_in, key, :size_t, key_size, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_PARTITION, :int32, partition, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TIMESTAMP, :int64, , :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_OPAQUE, :pointer, delivery_handle, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_END ) # Raise error if the produce call was not successfull if response != 0 DeliveryHandle.remove(delivery_handle.to_ptr.address) raise RdkafkaError.new(response) end delivery_handle end |