Class: Rdkafka::Producer

Inherits:
Object
  • Object
show all
Defined in:
lib/rdkafka/producer.rb,
lib/rdkafka/producer/delivery_handle.rb,
lib/rdkafka/producer/delivery_report.rb

Overview

A producer for Kafka messages. To create a producer set up a Config and call producer on that.

Defined Under Namespace

Classes: DeliveryHandle, DeliveryReport

Instance Method Summary collapse

Instance Method Details

#closeObject

Close this producer and wait for the internal poll queue to empty.



22
23
24
25
26
27
# File 'lib/rdkafka/producer.rb', line 22

def close
  # Indicate to polling thread that we're closing
  @closing = true
  # Wait for the polling thread to finish up
  @polling_thread.join
end

#produce(topic:, payload: nil, key: nil, partition: nil, timestamp: nil) ⇒ DeliveryHandle

Produces a message to a Kafka topic. The message is added to rdkafka's queue, call wait on the returned delivery handle to make sure it is delivered.

When no partition is specified the underlying Kafka library picks a partition based on the key. If no key is specified, a random partition will be used. When a timestamp is provided this is used instead of the autogenerated timestamp.

Parameters:

  • topic (String)

    The topic to produce to

  • payload (String) (defaults to: nil)

    The message's payload

  • key (String) (defaults to: nil)

    The message's key

  • partition (Integer) (defaults to: nil)

    Optional partition to produce to

  • timestamp (Integer) (defaults to: nil)

    Optional timestamp of this message

Returns:

  • (DeliveryHandle)

    Delivery handle that can be used to wait for the result of producing this message

Raises:

  • (RdkafkaError)

    When adding the message to rdkafka's queue failed



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/rdkafka/producer.rb', line 43

def produce(topic:, payload: nil, key: nil, partition: nil, timestamp: nil)
  # Start by checking and converting the input

  # Get payload length
  payload_size = if payload.nil?
                   0
                 else
                   payload.bytesize
                 end

  # Get key length
  key_size = if key.nil?
               0
             else
               key.bytesize
             end

  # If partition is nil use -1 to let Kafka set the partition based
  # on the key/randomly if there is no key
  partition = -1 if partition.nil?

  # If timestamp is nil use 0 and let Kafka set one
  timestamp = 0 if timestamp.nil?

  delivery_handle = DeliveryHandle.new
  delivery_handle[:pending] = true
  delivery_handle[:response] = -1
  delivery_handle[:partition] = -1
  delivery_handle[:offset] = -1
  DeliveryHandle.register(delivery_handle.to_ptr.address, delivery_handle)

  # Produce the message
  response = Rdkafka::Bindings.rd_kafka_producev(
    @native_kafka,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TOPIC, :string, topic,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_MSGFLAGS, :int, Rdkafka::Bindings::RD_KAFKA_MSG_F_COPY,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_VALUE, :buffer_in, payload, :size_t, payload_size,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_KEY, :buffer_in, key, :size_t, key_size,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_PARTITION, :int32, partition,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TIMESTAMP, :int64, timestamp,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_OPAQUE, :pointer, delivery_handle,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_END
  )

  # Raise error if the produce call was not successfull
  if response != 0
    DeliveryHandle.remove(delivery_handle.to_ptr.address)
    raise RdkafkaError.new(response)
  end

  delivery_handle
end