Class: Rdkafka::Producer
- Inherits:
-
Object
- Object
- Rdkafka::Producer
- Defined in:
- lib/rdkafka/producer.rb,
lib/rdkafka/producer/client.rb,
lib/rdkafka/producer/delivery_handle.rb,
lib/rdkafka/producer/delivery_report.rb
Overview
Defined Under Namespace
Classes: Client, DeliveryHandle, DeliveryReport
Instance Attribute Summary collapse
-
#delivery_callback ⇒ nil
writeonly
Set a callback that will be called every time a message is successfully produced.
Instance Method Summary collapse
- #arity(callback) ⇒ Object
-
#close ⇒ Object
Close this producer and wait for the internal poll queue to empty.
- #closed_producer_check(method) ⇒ Object
-
#partition_count(topic) ⇒ Object
Partition count for a given topic.
-
#produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil) ⇒ DeliveryHandle
Produces a message to a Kafka topic.
Instance Attribute Details
#delivery_callback=(callback) ⇒ nil
Set a callback that will be called every time a message is successfully produced. The callback is called with a DeliveryReport and DeliveryHandle
33 34 35 36 37 |
# File 'lib/rdkafka/producer.rb', line 33 def delivery_callback=(callback) raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) @delivery_callback = callback @delivery_callback_arity = arity(callback) end |
Instance Method Details
#arity(callback) ⇒ Object
168 169 170 171 172 |
# File 'lib/rdkafka/producer.rb', line 168 def arity(callback) return callback.arity if callback.respond_to?(:arity) callback.method(:call).arity end |
#close ⇒ Object
Close this producer and wait for the internal poll queue to empty.
40 41 42 43 44 |
# File 'lib/rdkafka/producer.rb', line 40 def close ObjectSpace.undefine_finalizer(self) @client.close end |
#closed_producer_check(method) ⇒ Object
174 175 176 |
# File 'lib/rdkafka/producer.rb', line 174 def closed_producer_check(method) raise Rdkafka::ClosedProducerError.new(method) if @client.closed? end |
#partition_count(topic) ⇒ Object
Partition count for a given topic. NOTE: If 'allow.auto.create.topics' is set to true in the broker, the topic will be auto-created after returning nil.
53 54 55 56 |
# File 'lib/rdkafka/producer.rb', line 53 def partition_count(topic) closed_producer_check(__method__) Rdkafka::Metadata.new(@client.native, topic).topics&.first[:partition_count] end |
#produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil) ⇒ DeliveryHandle
Produces a message to a Kafka topic. The message is added to rdkafka's queue, call wait on the returned delivery handle to make sure it is delivered.
When no partition is specified the underlying Kafka library picks a partition based on the key. If no key is specified, a random partition will be used. When a timestamp is provided this is used instead of the auto-generated timestamp.
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/rdkafka/producer.rb', line 74 def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil) closed_producer_check(__method__) # Start by checking and converting the input # Get payload length payload_size = if payload.nil? 0 else payload.bytesize end # Get key length key_size = if key.nil? 0 else key.bytesize end if partition_key partition_count = partition_count(topic) # If the topic is not present, set to -1 partition = Rdkafka::Bindings.partitioner(partition_key, partition_count, @partitioner_name) if partition_count end # If partition is nil, use -1 to let librdafka set the partition randomly or # based on the key when present. partition ||= -1 # If timestamp is nil use 0 and let Kafka set one. If an integer or time # use it. = if .nil? 0 elsif .is_a?(Integer) elsif .is_a?(Time) (.to_i * 1000) + (.usec / 1000) else raise TypeError.new("Timestamp has to be nil, an Integer or a Time") end delivery_handle = DeliveryHandle.new delivery_handle[:pending] = true delivery_handle[:response] = -1 delivery_handle[:partition] = -1 delivery_handle[:offset] = -1 DeliveryHandle.register(delivery_handle) args = [ :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TOPIC, :string, topic, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_MSGFLAGS, :int, Rdkafka::Bindings::RD_KAFKA_MSG_F_COPY, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_VALUE, :buffer_in, payload, :size_t, payload_size, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_KEY, :buffer_in, key, :size_t, key_size, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_PARTITION, :int32, partition, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TIMESTAMP, :int64, , :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_OPAQUE, :pointer, delivery_handle, ] if headers headers.each do |key0, value0| key = key0.to_s value = value0.to_s args << :int << Rdkafka::Bindings::RD_KAFKA_VTYPE_HEADER args << :string << key args << :pointer << value args << :size_t << value.bytes.size end end args << :int << Rdkafka::Bindings::RD_KAFKA_VTYPE_END # Produce the message response = Rdkafka::Bindings.rd_kafka_producev( @client.native, *args ) # Raise error if the produce call was not successful if response != 0 DeliveryHandle.remove(delivery_handle.to_ptr.address) raise RdkafkaError.new(response) end delivery_handle end |