Class: WaterDrop::Producer

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Karafka::Core::Helpers::Time, Async, Buffer, Sync, Transactions
Defined in:
lib/waterdrop/producer.rb,
lib/waterdrop/producer/sync.rb,
lib/waterdrop/producer/async.rb,
lib/waterdrop/producer/buffer.rb,
lib/waterdrop/producer/status.rb,
lib/waterdrop/producer/builder.rb,
lib/waterdrop/producer/transactions.rb

Overview

Main WaterDrop messages producer

Defined Under Namespace

Modules: Async, Buffer, Sync, Transactions Classes: Builder, Status

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Transactions

#transaction, #transaction_mark_as_consumed, #transactional?

Methods included from Buffer

#buffer, #buffer_many, #flush_async, #flush_sync

Methods included from Async

#produce_async, #produce_many_async

Methods included from Sync

#produce_many_sync, #produce_sync

Constructor Details

#initialize(&block) ⇒ Producer

Creates a not-yet-configured instance of the producer

Parameters:

  • block (Proc)

    configuration block



40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/waterdrop/producer.rb', line 40

def initialize(&block)
  @operations_in_progress = Helpers::Counter.new
  @buffer_mutex = Mutex.new
  @connecting_mutex = Mutex.new
  @operating_mutex = Mutex.new
  @transaction_mutex = Mutex.new

  @status = Status.new
  @messages = []

  return unless block

  setup(&block)
end

Instance Attribute Details

#configObject (readonly)

Returns dry-configurable config object.

Returns:

  • (Object)

    dry-configurable config object



35
36
37
# File 'lib/waterdrop/producer.rb', line 35

def config
  @config
end

#idString (readonly)

Returns uuid of the current producer.

Returns:

  • (String)

    uuid of the current producer



27
28
29
# File 'lib/waterdrop/producer.rb', line 27

def id
  @id
end

#messagesArray (readonly)

Returns internal messages buffer.

Returns:

  • (Array)

    internal messages buffer



31
32
33
# File 'lib/waterdrop/producer.rb', line 31

def messages
  @messages
end

#monitorObject (readonly)

Returns monitor we want to use.

Returns:

  • (Object)

    monitor we want to use



33
34
35
# File 'lib/waterdrop/producer.rb', line 33

def monitor
  @monitor
end

#statusStatus (readonly)

Returns producer status object.

Returns:

  • (Status)

    producer status object



29
30
31
# File 'lib/waterdrop/producer.rb', line 29

def status
  @status
end

Instance Method Details

#clientRdkafka::Producer

Note:

Client is lazy initialized, keeping in mind also the fact of a potential fork that can happen any time.

Note:

It is not recommended to fork a producer that is already in use so in case of bootstrapping a cluster, it’s much better to fork configured but not used producers

Returns raw rdkafka producer.

Returns:

  • (Rdkafka::Producer)

    raw rdkafka producer

Raises:



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/waterdrop/producer.rb', line 76

def client
  return @client if @client && @pid == Process.pid

  # Don't allow to obtain a client reference for a producer that was not configured
  raise Errors::ProducerNotConfiguredError, id if @status.initial?

  @connecting_mutex.synchronize do
    return @client if @client && @pid == Process.pid

    # We undefine all the finalizers, in case it was a fork, so the finalizers from the parent
    # process don't leak
    ObjectSpace.undefine_finalizer(id)

    # We should raise an error when trying to use a producer with client from a fork. Always.
    if @client
      # We need to reset the client, otherwise there might be attempt to close the parent
      # client
      @client = nil
      raise Errors::ProducerUsedInParentProcess, Process.pid
    end

    # Finalizer tracking is needed for handling shutdowns gracefully.
    # I don't expect everyone to remember about closing all the producers all the time, thus
    # this approach is better. Although it is still worth keeping in mind, that this will
    # block GC from removing a no longer used producer unless closed properly but at least
    # won't crash the VM upon closing the process
    ObjectSpace.define_finalizer(id, proc { close })

    @pid = Process.pid
    @client = Builder.new.call(self, @config)

    # Register statistics runner for this particular type of callbacks
    ::Karafka::Core::Instrumentation.statistics_callbacks.add(
      @id,
      Instrumentation::Callbacks::Statistics.new(@id, @client.name, @config.monitor)
    )

    # Register error tracking callback
    ::Karafka::Core::Instrumentation.error_callbacks.add(
      @id,
      Instrumentation::Callbacks::Error.new(@id, @client.name, @config.monitor)
    )

    @status.connected!
    @monitor.instrument('producer.connected', producer_id: id)
  end

  @client
end

#close(force: false) ⇒ Object

Flushes the buffers in a sync way and closes the producer

Parameters:

  • force (Boolean) (defaults to: false)

    should we force closing even with outstanding messages after the max wait timeout



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/waterdrop/producer.rb', line 155

def close(force: false)
  @operating_mutex.synchronize do
    return unless @status.active?

    @monitor.instrument(
      'producer.closed',
      producer_id: id
    ) do
      @status.closing!
      @monitor.instrument('producer.closing', producer_id: id)

      # No need for auto-gc if everything got closed by us
      # This should be used only in case a producer was not closed properly and forgotten
      ObjectSpace.undefine_finalizer(id)

      # We save this thread id because we need to bypass the activity verification on the
      # producer for final flush of buffers.
      @closing_thread_id = Thread.current.object_id

      # Wait until all the outgoing operations are done. Only when no one is using the
      # underlying client running operations we can close
      sleep(0.001) until @operations_in_progress.value.zero?

      # Flush has its own buffer mutex but even if it is blocked, flushing can still happen
      # as we close the client after the flushing (even if blocked by the mutex)
      flush(true)

      # We should not close the client in several threads the same time
      # It is safe to run it several times but not exactly the same moment
      # We also mark it as closed only if it was connected, if not, it would trigger a new
      # connection that anyhow would be immediately closed
      if @client
        # Why do we trigger it early instead of just having `#close` do it?
        # The linger.ms time will be ignored for the duration of the call,
        # queued messages will be sent to the broker as soon as possible.
        begin
          # `max_wait_timeout` is in seconds at the moment
          @client.flush(@config.max_wait_timeout * 1_000) unless @client.closed?
        # We can safely ignore timeouts here because any left outstanding requests
        # will anyhow force wait on close if not forced.
        # If forced, we will purge the queue and just close
        rescue ::Rdkafka::RdkafkaError, Rdkafka::AbstractHandle::WaitTimeoutError
          nil
        ensure
          # Purge fully the local queue in case of a forceful shutdown just to be sure, that
          # there are no dangling messages. In case flush was successful, there should be
          # none but we do it just in case it timed out
          purge if force
        end

        @client.close

        @client = nil
      end

      # Remove callbacks runners that were registered
      ::Karafka::Core::Instrumentation.statistics_callbacks.delete(@id)
      ::Karafka::Core::Instrumentation.error_callbacks.delete(@id)

      @status.closed!
    end
  end
end

#close!Object

Closes the producer with forced close after timeout, purging any outgoing data



220
221
222
# File 'lib/waterdrop/producer.rb', line 220

def close!
  close(force: true)
end

#partition_count(topic) ⇒ Integer

Note:

It uses the underlying ‘rdkafka-ruby` partition count fetch and cache.

Fetches and caches the partition count of a topic

Parameters:

  • topic (String)

    topic for which we want to get the number of partitions

Returns:

  • (Integer)

    number of partitions of the requested topic or -1 if number could not be retrieved.



133
134
135
# File 'lib/waterdrop/producer.rb', line 133

def partition_count(topic)
  client.partition_count(topic.to_s)
end

#purgeObject

Note:

This is an operation that can cause data loss. Keep that in mind. It will not only purge the internal WaterDrop buffer but will also purge the librdkafka queue as well as will cancel any outgoing messages dispatches.

Purges data from both the buffer queue as well as the librdkafka queue.



142
143
144
145
146
147
148
149
150
# File 'lib/waterdrop/producer.rb', line 142

def purge
  @monitor.instrument('buffer.purged', producer_id: id) do
    @buffer_mutex.synchronize do
      @messages = []
    end

    @client.purge
  end
end

#setup(&block) ⇒ Object

Sets up the whole configuration and initializes all that is needed

Parameters:

  • block (Block)

    configuration block

Raises:



57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/waterdrop/producer.rb', line 57

def setup(&block)
  raise Errors::ProducerAlreadyConfiguredError, id unless @status.initial?

  @config = Config
            .new
            .setup(&block)
            .config

  @id = @config.id
  @monitor = @config.monitor
  @contract = Contracts::Message.new(max_payload_size: @config.max_payload_size)
  @status.configured!
end