Class: Rdkafka::Producer

Inherits:
Object
  • Object
show all
Defined in:
lib/rdkafka/producer.rb,
lib/rdkafka/producer/delivery_handle.rb,
lib/rdkafka/producer/delivery_report.rb

Overview

A producer for Kafka messages. To create a producer set up a Config and call producer on that.

Defined Under Namespace

Classes: DeliveryHandle, DeliveryReport

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#delivery_callback=(callback) ⇒ nil

Set a callback that will be called every time a message is successfully produced. The callback is called with a DeliveryReport

Parameters:

  • callback (Proc, #call)

    The callback

Returns:

  • (nil)

Raises:

  • (TypeError)

37
38
39
40
# File 'lib/rdkafka/producer.rb', line 37

def delivery_callback=(callback)
  raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call)
  @delivery_callback = callback
end

Instance Method Details

#closeObject

Close this producer and wait for the internal poll queue to empty.


43
44
45
46
47
48
49
50
51
52
# File 'lib/rdkafka/producer.rb', line 43

def close
  return unless @native_kafka

  # Indicate to polling thread that we're closing
  @closing = true
  # Wait for the polling thread to finish up
  @polling_thread.join
  Rdkafka::Bindings.rd_kafka_destroy(@native_kafka)
  @native_kafka = nil
end

#closed_producer_check(method) ⇒ Object


172
173
174
# File 'lib/rdkafka/producer.rb', line 172

def closed_producer_check(method)
  raise Rdkafka::ClosedProducerError.new(method) if @native_kafka.nil?
end

#partition_count(topic) ⇒ Object

Partition count for a given topic. NOTE: If 'allow.auto.create.topics' is set to true in the broker, the topic will be auto-created after returning nil.

Parameters:

  • topic (String)

    The topic name.

Returns:

  • partition count [Integer,nil]


61
62
63
64
# File 'lib/rdkafka/producer.rb', line 61

def partition_count(topic)
  closed_producer_check(__method__)
  Rdkafka::Metadata.new(@native_kafka, topic).topics&.first[:partition_count]
end

#produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil) ⇒ DeliveryHandle

Produces a message to a Kafka topic. The message is added to rdkafka's queue, call wait on the returned delivery handle to make sure it is delivered.

When no partition is specified the underlying Kafka library picks a partition based on the key. If no key is specified, a random partition will be used. When a timestamp is provided this is used instead of the auto-generated timestamp.

Parameters:

  • topic (String)

    The topic to produce to

  • payload (String, nil) (defaults to: nil)

    The message's payload

  • key (String) (defaults to: nil)

    The message's key

  • partition (Integer, nil) (defaults to: nil)

    Optional partition to produce to

  • timestamp (Time, Integer, nil) (defaults to: nil)

    Optional timestamp of this message. Integer timestamp is in milliseconds since Jan 1 1970.

  • headers (Hash<String,String>) (defaults to: nil)

    Optional message headers

Returns:

  • (DeliveryHandle)

    Delivery handle that can be used to wait for the result of producing this message

Raises:

  • (RdkafkaError)

    When adding the message to rdkafka's queue failed


81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/rdkafka/producer.rb', line 81

def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil)
  closed_producer_check(__method__)

  # Start by checking and converting the input

  # Get payload length
  payload_size = if payload.nil?
                   0
                 else
                   payload.bytesize
                 end

  # Get key length
  key_size = if key.nil?
               0
             else
               key.bytesize
             end

  if partition_key
    partition_count = partition_count(topic)
    # If the topic is not present, set to -1
    partition = Rdkafka::Bindings.partitioner(partition_key, partition_count) if partition_count
  end

  # If partition is nil, use -1 to let librdafka set the partition randomly or
  # based on the key when present.
  partition ||= -1

  # If timestamp is nil use 0 and let Kafka set one. If an integer or time
  # use it.
  raw_timestamp = if timestamp.nil?
                    0
                  elsif timestamp.is_a?(Integer)
                    timestamp
                  elsif timestamp.is_a?(Time)
                    (timestamp.to_i * 1000) + (timestamp.usec / 1000)
                  else
                    raise TypeError.new("Timestamp has to be nil, an Integer or a Time")
                  end

  delivery_handle = DeliveryHandle.new
  delivery_handle[:pending] = true
  delivery_handle[:response] = -1
  delivery_handle[:partition] = -1
  delivery_handle[:offset] = -1
  DeliveryHandle.register(delivery_handle)

  args = [
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TOPIC, :string, topic,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_MSGFLAGS, :int, Rdkafka::Bindings::RD_KAFKA_MSG_F_COPY,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_VALUE, :buffer_in, payload, :size_t, payload_size,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_KEY, :buffer_in, key, :size_t, key_size,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_PARTITION, :int32, partition,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TIMESTAMP, :int64, raw_timestamp,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_OPAQUE, :pointer, delivery_handle,
  ]

  if headers
    headers.each do |key0, value0|
      key = key0.to_s
      value = value0.to_s
      args << :int << Rdkafka::Bindings::RD_KAFKA_VTYPE_HEADER
      args << :string << key
      args << :pointer << value
      args << :size_t << value.bytes.size
    end
  end

  args << :int << Rdkafka::Bindings::RD_KAFKA_VTYPE_END

  # Produce the message
  response = Rdkafka::Bindings.rd_kafka_producev(
    @native_kafka,
    *args
  )

  # Raise error if the produce call was not successful
  if response != 0
    DeliveryHandle.remove(delivery_handle.to_ptr.address)
    raise RdkafkaError.new(response)
  end

  delivery_handle
end