Class: WaterDrop::Producer
- Inherits:
-
Object
- Object
- WaterDrop::Producer
- Defined in:
- lib/water_drop/producer.rb,
lib/water_drop/producer/sync.rb,
lib/water_drop/producer/async.rb,
lib/water_drop/producer/buffer.rb,
lib/water_drop/producer/status.rb,
lib/water_drop/producer/builder.rb,
lib/water_drop/producer/dummy_client.rb,
lib/water_drop/producer/statistics_decorator.rb
Overview
Main WaterDrop messages producer
Defined Under Namespace
Modules: Async, Buffer, Sync Classes: Builder, DummyClient, StatisticsDecorator, Status
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Dry-configurable config object.
-
#id ⇒ String
readonly
Uuid of the current producer.
-
#messages ⇒ Concurrent::Array
readonly
Internal messages buffer.
-
#monitor ⇒ Object
readonly
Monitor we want to use.
-
#status ⇒ Status
readonly
Producer status object.
Instance Method Summary collapse
-
#client ⇒ Rdkafka::Producer
Raw rdkafka producer.
-
#close ⇒ Object
Flushes the buffers in a sync way and closes the producer.
-
#ensure_active! ⇒ Object
Ensures that we don’t run any operations when the producer is not configured or when it was already closed.
-
#initialize(&block) ⇒ Producer
constructor
Creates a not-yet-configured instance of the producer.
-
#setup(&block) ⇒ Object
Sets up the whole configuration and initializes all that is needed.
-
#validate_message!(message) ⇒ Object
Ensures that the message we want to send out to Kafka is actually valid and that it can be sent there.
Methods included from Buffer
#buffer, #buffer_many, #flush_async, #flush_sync
Methods included from Async
#produce_async, #produce_many_async
Methods included from Sync
#produce_many_sync, #produce_sync
Constructor Details
#initialize(&block) ⇒ Producer
Creates a not-yet-configured instance of the producer
24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/water_drop/producer.rb', line 24 def initialize(&block) @buffer_mutex = Mutex.new @connecting_mutex = Mutex.new @closing_mutex = Mutex.new @status = Status.new = Concurrent::Array.new return unless block setup(&block) end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns dry-configurable config object.
19 20 21 |
# File 'lib/water_drop/producer.rb', line 19 def config @config end |
#id ⇒ String (readonly)
Returns uuid of the current producer.
11 12 13 |
# File 'lib/water_drop/producer.rb', line 11 def id @id end |
#messages ⇒ Concurrent::Array (readonly)
Returns internal messages buffer.
15 16 17 |
# File 'lib/water_drop/producer.rb', line 15 def end |
#monitor ⇒ Object (readonly)
Returns monitor we want to use.
17 18 19 |
# File 'lib/water_drop/producer.rb', line 17 def monitor @monitor end |
#status ⇒ Status (readonly)
Returns producer status object.
13 14 15 |
# File 'lib/water_drop/producer.rb', line 13 def status @status end |
Instance Method Details
#client ⇒ Rdkafka::Producer
Client is lazy initialized, keeping in mind also the fact of a potential fork that can happen any time.
It is not recommended to fork a producer that is already in use so in case of bootstrapping a cluster, it’s much better to fork configured but not used producers
Returns raw rdkafka producer.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/water_drop/producer.rb', line 58 def client return @client if @client && @pid == Process.pid # Don't allow to obtain a client reference for a producer that was not configured raise Errors::ProducerNotConfiguredError, id if @status.initial? @connecting_mutex.synchronize do return @client if @client && @pid == Process.pid # We should raise an error when trying to use a producer from a fork, that is already # connected to Kafka. We allow forking producers only before they are used raise Errors::ProducerUsedInParentProcess, Process.pid if @status.connected? # We undefine all the finalizers, in case it was a fork, so the finalizers from the parent # process don't leak ObjectSpace.undefine_finalizer(id) # Finalizer tracking is needed for handling shutdowns gracefully. # I don't expect everyone to remember about closing all the producers all the time, thus # this approach is better. Although it is still worth keeping in mind, that this will # block GC from removing a no longer used producer unless closed properly but at least # won't crash the VM upon closing the process ObjectSpace.define_finalizer(id, proc { close }) @pid = Process.pid @client = Builder.new.call(self, @config) @status.connected! end @client end |
#close ⇒ Object
Flushes the buffers in a sync way and closes the producer
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/water_drop/producer.rb', line 90 def close @closing_mutex.synchronize do return unless @status.active? @monitor.instrument( 'producer.closed', producer: self ) do @status.closing! # No need for auto-gc if everything got closed by us # This should be used only in case a producer was not closed properly and forgotten ObjectSpace.undefine_finalizer(id) # Flush has it's own buffer mutex but even if it is blocked, flushing can still happen # as we close the client after the flushing (even if blocked by the mutex) flush(false) # We should not close the client in several threads the same time # It is safe to run it several times but not exactly the same moment client.close @status.closed! end end end |
#ensure_active! ⇒ Object
Ensures that we don’t run any operations when the producer is not configured or when it was already closed
119 120 121 122 123 124 125 126 127 |
# File 'lib/water_drop/producer.rb', line 119 def ensure_active! return if @status.active? raise Errors::ProducerNotConfiguredError, id if @status.initial? raise Errors::ProducerClosedError, id if @status.closing? || @status.closed? # This should never happen raise Errors::StatusInvalidError, [id, @status.to_s] end |
#setup(&block) ⇒ Object
Sets up the whole configuration and initializes all that is needed
39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/water_drop/producer.rb', line 39 def setup(&block) raise Errors::ProducerAlreadyConfiguredError, id unless @status.initial? @config = Config .new .setup(&block) .config @id = @config.id @monitor = @config.monitor @contract = Contracts::Message.new(max_payload_size: @config.max_payload_size) @status.configured! end |
#validate_message!(message) ⇒ Object
Ensures that the message we want to send out to Kafka is actually valid and that it can be sent there
133 134 135 136 137 138 139 140 141 |
# File 'lib/water_drop/producer.rb', line 133 def () result = @contract.call() return if result.success? raise Errors::MessageInvalidError, [ result.errors.to_h, ] end |