Class: Datadog::DataStreams::Processor

Inherits:
Core::Worker show all
Includes:
Core::Workers::Polling
Defined in:
lib/datadog/data_streams/processor.rb

Overview

Processor for Data Streams Monitoring This class is responsible for collecting and reporting pathway stats Periodically (every interval, 10 seconds by default) flushes stats to the Datadog agent.

Constant Summary collapse

PROPAGATION_KEY =
'dd-pathway-ctx-base64'
DEFAULT_BUFFER_SIZE =

Default buffer size for lock-free event queue Set to handle high-throughput scenarios (e.g., 10k events/sec for 10s interval)

100_000

Constants included from Core::Workers::Polling

Core::Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT

Instance Attribute Summary collapse

Attributes inherited from Core::Worker

#task

Instance Method Summary collapse

Methods included from Core::Workers::Polling

#enabled=, #enabled?, included, #stop

Constructor Details

#initialize(interval:, logger:, settings:, agent_settings:, buffer_size: DEFAULT_BUFFER_SIZE) ⇒ Processor

Initialize the Data Streams Monitoring processor

Parameters:

  • interval (Float)

    Flush interval in seconds (e.g., 10.0 for 10 seconds)

  • logger (Datadog::Core::Logger)

    Logger instance for debugging

  • settings (Datadog::Core::Configuration::Settings)

    Global configuration settings

  • agent_settings (Datadog::Core::Configuration::AgentSettings)

    Agent connection settings

  • buffer_size (Integer) (defaults to: DEFAULT_BUFFER_SIZE)

    Size of the lock-free event buffer for async stat collection (default: DEFAULT_BUFFER_SIZE). Higher values support more throughput but use more memory.

Raises:



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/datadog/data_streams/processor.rb', line 41

def initialize(interval:, logger:, settings:, agent_settings:, buffer_size: DEFAULT_BUFFER_SIZE)
  raise UnsupportedError, 'DDSketch is not supported' unless Datadog::Core::DDSketch.supported?

  @settings = settings
  @agent_settings = agent_settings
  @logger = logger

  now = Core::Utils::Time.now
  @pathway_context = PathwayContext.new(
    hash_value: 0,
    pathway_start: now,
    current_edge_start: now
  )
  @bucket_size_ns = (interval * 1e9).to_i
  @buckets = {}
  @consumer_stats = []
  @stats_mutex = Mutex.new
  @event_buffer = Core::Buffer::CRuby.new(buffer_size)

  super()
  self.loop_base_interval = interval

  perform
end

Instance Attribute Details

#bucket_size_nsObject (readonly)

Returns the value of attribute bucket_size_ns.



30
31
32
# File 'lib/datadog/data_streams/processor.rb', line 30

def bucket_size_ns
  @bucket_size_ns
end

#bucketsObject (readonly)

Returns the value of attribute buckets.



30
31
32
# File 'lib/datadog/data_streams/processor.rb', line 30

def buckets
  @buckets
end

#pathway_contextObject (readonly)

Returns the value of attribute pathway_context.



30
31
32
# File 'lib/datadog/data_streams/processor.rb', line 30

def pathway_context
  @pathway_context
end

Instance Method Details

#performObject

Called periodically by the worker to flush stats to the agent



149
150
151
152
153
# File 'lib/datadog/data_streams/processor.rb', line 149

def perform
  process_events
  flush_stats
  true
end

#set_consume_checkpoint(type:, source:, manual_checkpoint: true, tags: {}) {|key| ... } ⇒ String

Set a consume checkpoint

Parameters:

  • type (String)

    The type of the checkpoint (e.g., ‘kafka’, ‘kinesis’, ‘sns’)

  • source (String)

    The source (e.g., topic, exchange, stream name)

  • manual_checkpoint (Boolean) (defaults to: true)

    Whether this checkpoint was manually set (default: true)

  • tags (Hash) (defaults to: {})

    Additional tags to include

Yields:

  • (key)

    Block to extract context from carrier

Returns:

  • (String)

    Base64 encoded pathway context



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/datadog/data_streams/processor.rb', line 131

def set_consume_checkpoint(type:, source:, manual_checkpoint: true, tags: {}, &block)
  if block
    pathway_ctx = yield(PROPAGATION_KEY)
    if pathway_ctx
      decoded_ctx = decode_pathway_b64(pathway_ctx)
      set_pathway_context(decoded_ctx)
    end
  end

  checkpoint_tags = ["type:#{type}", "topic:#{source}", 'direction:in']
  checkpoint_tags << 'manual_checkpoint:true' if manual_checkpoint
  checkpoint_tags.concat(tags.map { |k, v| "#{k}:#{v}" }) unless tags.empty?

  span = Datadog::Tracing.active_span
  set_checkpoint(tags: checkpoint_tags, span: span)
end

#set_produce_checkpoint(type:, destination:, manual_checkpoint: true, tags: {}) {|key, value| ... } ⇒ String

Set a produce checkpoint

Parameters:

  • type (String)

    The type of the checkpoint (e.g., ‘kafka’, ‘kinesis’, ‘sns’)

  • destination (String)

    The destination (e.g., topic, exchange, stream name)

  • manual_checkpoint (Boolean) (defaults to: true)

    Whether this checkpoint was manually set (default: true)

  • tags (Hash) (defaults to: {})

    Additional tags to include

Yields:

  • (key, value)

    Block to inject context into carrier

Returns:

  • (String)

    Base64 encoded pathway context



111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/datadog/data_streams/processor.rb', line 111

def set_produce_checkpoint(type:, destination:, manual_checkpoint: true, tags: {}, &block)
  checkpoint_tags = ["type:#{type}", "topic:#{destination}", 'direction:out']
  checkpoint_tags << 'manual_checkpoint:true' if manual_checkpoint
  checkpoint_tags.concat(tags.map { |k, v| "#{k}:#{v}" }) unless tags.empty?

  span = Datadog::Tracing.active_span
  pathway = set_checkpoint(tags: checkpoint_tags, span: span)

  yield(PROPAGATION_KEY, pathway) if pathway && block

  pathway
end

#track_kafka_consume(topic, partition, offset, now) ⇒ Boolean

Track Kafka message consumption for consumer lag monitoring

Parameters:

  • topic (String)

    The Kafka topic name

  • partition (Integer)

    The partition number

  • offset (Integer)

    The offset of the consumed message

  • now (Time)

    Timestamp

Returns:

  • (Boolean)

    true if tracking succeeded



91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/datadog/data_streams/processor.rb', line 91

def track_kafka_consume(topic, partition, offset, now)
  @event_buffer.push(
    {
      type: :kafka_consume,
      topic: topic,
      partition: partition,
      offset: offset,
      timestamp: now
    }
  )
  true
end

#track_kafka_produce(topic, partition, offset, now) ⇒ Boolean

Track Kafka produce offset for lag monitoring

Parameters:

  • topic (String)

    The Kafka topic name

  • partition (Integer)

    The partition number

  • offset (Integer)

    The offset of the produced message

  • now (Time)

    Timestamp

Returns:

  • (Boolean)

    true if tracking succeeded



72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/datadog/data_streams/processor.rb', line 72

def track_kafka_produce(topic, partition, offset, now)
  @event_buffer.push(
    {
      type: :kafka_produce,
      topic: topic,
      partition: partition,
      offset: offset,
      timestamp_ns: (now.to_f * 1e9).to_i
    }
  )
  true
end