Class: Rdkafka::Producer
- Inherits:
-
Object
- Object
- Rdkafka::Producer
- Defined in:
- lib/rdkafka/producer.rb,
lib/rdkafka/producer/delivery_handle.rb,
lib/rdkafka/producer/delivery_report.rb
Overview
Defined Under Namespace
Classes: DeliveryHandle, DeliveryReport
Instance Attribute Summary collapse
-
#delivery_callback ⇒ nil
writeonly
Set a callback that will be called every time a message is successfully produced.
Instance Method Summary collapse
- #arity(callback) ⇒ Object
- #call_delivery_callback(delivery_report, delivery_handle) ⇒ Object
-
#close ⇒ Object
Close this producer and wait for the internal poll queue to empty.
-
#closed? ⇒ Boolean
Whether this producer has closed.
-
#flush(timeout_ms = 5_000) ⇒ Object
Wait until all outstanding producer requests are completed, with the given timeout in seconds.
-
#partition_count(topic) ⇒ Object
Partition count for a given topic.
-
#produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil) ⇒ DeliveryHandle
Produces a message to a Kafka topic.
Instance Attribute Details
#delivery_callback=(callback) ⇒ nil
Set a callback that will be called every time a message is successfully produced. The callback is called with a DeliveryReport and DeliveryHandle
35 36 37 38 39 |
# File 'lib/rdkafka/producer.rb', line 35 def delivery_callback=(callback) raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) @delivery_callback = callback @delivery_callback_arity = arity(callback) end |
Instance Method Details
#arity(callback) ⇒ Object
189 190 191 192 193 |
# File 'lib/rdkafka/producer.rb', line 189 def arity(callback) return callback.arity if callback.respond_to?(:arity) callback.method(:call).arity end |
#call_delivery_callback(delivery_report, delivery_handle) ⇒ Object
182 183 184 185 186 187 |
# File 'lib/rdkafka/producer.rb', line 182 def call_delivery_callback(delivery_report, delivery_handle) return unless @delivery_callback args = [delivery_report, delivery_handle].take(@delivery_callback_arity) @delivery_callback.call(*args) end |
#close ⇒ Object
Close this producer and wait for the internal poll queue to empty.
42 43 44 45 46 |
# File 'lib/rdkafka/producer.rb', line 42 def close return if closed? ObjectSpace.undefine_finalizer(self) @native_kafka.close end |
#closed? ⇒ Boolean
Whether this producer has closed
49 50 51 |
# File 'lib/rdkafka/producer.rb', line 49 def closed? @native_kafka.closed? end |
#flush(timeout_ms = 5_000) ⇒ Object
Wait until all outstanding producer requests are completed, with the given timeout in seconds. Call this before closing a producer to ensure delivery of all messages.
57 58 59 60 61 62 63 |
# File 'lib/rdkafka/producer.rb', line 57 def flush(timeout_ms=5_000) closed_producer_check(__method__) @native_kafka.with_inner do |inner| Rdkafka::Bindings.rd_kafka_flush(inner, timeout_ms) end end |
#partition_count(topic) ⇒ Object
Partition count for a given topic. NOTE: If 'allow.auto.create.topics' is set to true in the broker, the topic will be auto-created after returning nil.
71 72 73 74 75 76 |
# File 'lib/rdkafka/producer.rb', line 71 def partition_count(topic) closed_producer_check(__method__) @native_kafka.with_inner do |inner| Rdkafka::Metadata.new(inner, topic).topics&.first[:partition_count] end end |
#produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil) ⇒ DeliveryHandle
Produces a message to a Kafka topic. The message is added to rdkafka's queue, call wait on the returned delivery handle to make sure it is delivered.
When no partition is specified the underlying Kafka library picks a partition based on the key. If no key is specified, a random partition will be used. When a timestamp is provided this is used instead of the auto-generated timestamp.
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/rdkafka/producer.rb', line 94 def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil) closed_producer_check(__method__) # Start by checking and converting the input # Get payload length payload_size = if payload.nil? 0 else payload.bytesize end # Get key length key_size = if key.nil? 0 else key.bytesize end if partition_key partition_count = partition_count(topic) # If the topic is not present, set to -1 partition = Rdkafka::Bindings.partitioner(partition_key, partition_count, @partitioner_name) if partition_count end # If partition is nil, use -1 to let librdafka set the partition randomly or # based on the key when present. partition ||= -1 # If timestamp is nil use 0 and let Kafka set one. If an integer or time # use it. = if .nil? 0 elsif .is_a?(Integer) elsif .is_a?(Time) (.to_i * 1000) + (.usec / 1000) else raise TypeError.new("Timestamp has to be nil, an Integer or a Time") end delivery_handle = DeliveryHandle.new delivery_handle[:pending] = true delivery_handle[:response] = -1 delivery_handle[:partition] = -1 delivery_handle[:offset] = -1 DeliveryHandle.register(delivery_handle) args = [ :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TOPIC, :string, topic, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_MSGFLAGS, :int, Rdkafka::Bindings::RD_KAFKA_MSG_F_COPY, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_VALUE, :buffer_in, payload, :size_t, payload_size, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_KEY, :buffer_in, key, :size_t, key_size, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_PARTITION, :int32, partition, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TIMESTAMP, :int64, , :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_OPAQUE, :pointer, delivery_handle, ] if headers headers.each do |key0, value0| key = key0.to_s value = value0.to_s args << :int << Rdkafka::Bindings::RD_KAFKA_VTYPE_HEADER args << :string << key args << :pointer << value args << :size_t << value.bytes.size end end args << :int << Rdkafka::Bindings::RD_KAFKA_VTYPE_END # Produce the message response = @native_kafka.with_inner do |inner| Rdkafka::Bindings.rd_kafka_producev( inner, *args ) end # Raise error if the produce call was not successful if response != 0 DeliveryHandle.remove(delivery_handle.to_ptr.address) raise RdkafkaError.new(response) end delivery_handle end |