Class: Datadog::DataStreams::Processor
- Inherits:
-
Core::Worker
- Object
- Core::Worker
- Datadog::DataStreams::Processor
- Includes:
- Core::Workers::Polling
- Defined in:
- lib/datadog/data_streams/processor.rb
Overview
Processor for Data Streams Monitoring This class is responsible for collecting and reporting pathway stats Periodically (every interval, 10 seconds by default) flushes stats to the Datadog agent.
Constant Summary collapse
- PROPAGATION_KEY =
'dd-pathway-ctx-base64'- DEFAULT_BUFFER_SIZE =
Default buffer size for lock-free event queue Set to handle high-throughput scenarios (e.g., 10k events/sec for 10s interval)
100_000
Constants included from Core::Workers::Polling
Core::Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT
Instance Attribute Summary collapse
-
#bucket_size_ns ⇒ Object
readonly
Returns the value of attribute bucket_size_ns.
-
#buckets ⇒ Object
readonly
Returns the value of attribute buckets.
-
#pathway_context ⇒ Object
readonly
Returns the value of attribute pathway_context.
Attributes inherited from Core::Worker
Instance Method Summary collapse
-
#initialize(interval:, logger:, settings:, agent_settings:, buffer_size: DEFAULT_BUFFER_SIZE) ⇒ Processor
constructor
Initialize the Data Streams Monitoring processor.
-
#perform ⇒ Object
Called periodically by the worker to flush stats to the agent.
-
#set_consume_checkpoint(type:, source:, manual_checkpoint: true, tags: {}) {|key| ... } ⇒ String
Set a consume checkpoint.
-
#set_produce_checkpoint(type:, destination:, manual_checkpoint: true, tags: {}) {|key, value| ... } ⇒ String
Set a produce checkpoint.
-
#track_kafka_consume(topic, partition, offset, now) ⇒ Boolean
Track Kafka message consumption for consumer lag monitoring.
-
#track_kafka_produce(topic, partition, offset, now) ⇒ Boolean
Track Kafka produce offset for lag monitoring.
Methods included from Core::Workers::Polling
#enabled=, #enabled?, included, #stop
Constructor Details
#initialize(interval:, logger:, settings:, agent_settings:, buffer_size: DEFAULT_BUFFER_SIZE) ⇒ Processor
Initialize the Data Streams Monitoring processor
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/datadog/data_streams/processor.rb', line 41 def initialize(interval:, logger:, settings:, agent_settings:, buffer_size: DEFAULT_BUFFER_SIZE) raise UnsupportedError, 'DDSketch is not supported' unless Datadog::Core::DDSketch.supported? @settings = settings @agent_settings = agent_settings @logger = logger now = Core::Utils::Time.now @pathway_context = PathwayContext.new( hash_value: 0, pathway_start: now, current_edge_start: now ) @bucket_size_ns = (interval * 1e9).to_i @buckets = {} @consumer_stats = [] @stats_mutex = Mutex.new @event_buffer = Core::Buffer::CRuby.new(buffer_size) super() self.loop_base_interval = interval perform end |
Instance Attribute Details
#bucket_size_ns ⇒ Object (readonly)
Returns the value of attribute bucket_size_ns.
30 31 32 |
# File 'lib/datadog/data_streams/processor.rb', line 30 def bucket_size_ns @bucket_size_ns end |
#buckets ⇒ Object (readonly)
Returns the value of attribute buckets.
30 31 32 |
# File 'lib/datadog/data_streams/processor.rb', line 30 def buckets @buckets end |
#pathway_context ⇒ Object (readonly)
Returns the value of attribute pathway_context.
30 31 32 |
# File 'lib/datadog/data_streams/processor.rb', line 30 def pathway_context @pathway_context end |
Instance Method Details
#perform ⇒ Object
Called periodically by the worker to flush stats to the agent
149 150 151 152 153 |
# File 'lib/datadog/data_streams/processor.rb', line 149 def perform process_events flush_stats true end |
#set_consume_checkpoint(type:, source:, manual_checkpoint: true, tags: {}) {|key| ... } ⇒ String
Set a consume checkpoint
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/datadog/data_streams/processor.rb', line 131 def set_consume_checkpoint(type:, source:, manual_checkpoint: true, tags: {}, &block) if block pathway_ctx = yield(PROPAGATION_KEY) if pathway_ctx decoded_ctx = decode_pathway_b64(pathway_ctx) set_pathway_context(decoded_ctx) end end = ["type:#{type}", "topic:#{source}", 'direction:in'] << 'manual_checkpoint:true' if manual_checkpoint .concat(.map { |k, v| "#{k}:#{v}" }) unless .empty? span = Datadog::Tracing.active_span set_checkpoint(tags: , span: span) end |
#set_produce_checkpoint(type:, destination:, manual_checkpoint: true, tags: {}) {|key, value| ... } ⇒ String
Set a produce checkpoint
111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/datadog/data_streams/processor.rb', line 111 def set_produce_checkpoint(type:, destination:, manual_checkpoint: true, tags: {}, &block) = ["type:#{type}", "topic:#{destination}", 'direction:out'] << 'manual_checkpoint:true' if manual_checkpoint .concat(.map { |k, v| "#{k}:#{v}" }) unless .empty? span = Datadog::Tracing.active_span pathway = set_checkpoint(tags: , span: span) yield(PROPAGATION_KEY, pathway) if pathway && block pathway end |
#track_kafka_consume(topic, partition, offset, now) ⇒ Boolean
Track Kafka message consumption for consumer lag monitoring
91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/datadog/data_streams/processor.rb', line 91 def track_kafka_consume(topic, partition, offset, now) @event_buffer.push( { type: :kafka_consume, topic: topic, partition: partition, offset: offset, timestamp: now } ) true end |
#track_kafka_produce(topic, partition, offset, now) ⇒ Boolean
Track Kafka produce offset for lag monitoring
72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/datadog/data_streams/processor.rb', line 72 def track_kafka_produce(topic, partition, offset, now) @event_buffer.push( { type: :kafka_produce, topic: topic, partition: partition, offset: offset, timestamp_ns: (now.to_f * 1e9).to_i } ) true end |