Module: Pigeon::Tracing::Messaging

Defined in:
lib/pigeon/tracing/messaging.rb

Overview

Messaging-specific tracing functionality

Class Method Summary collapse

Class Method Details

.add_process_span_attributes(span, message) ⇒ void

This method returns an undefined value.

Add additional attributes to a process span

Parameters:



115
116
117
118
119
120
121
122
# File 'lib/pigeon/tracing/messaging.rb', line 115

def self.add_process_span_attributes(span, message)
  return unless span && message.partition

  span.add_attributes({
                        "messaging.kafka.partition" => message.partition,
                        "messaging.kafka.message.retry_count" => message.retry_count
                      })
end

.process_span_attributes(message) ⇒ Hash

Create span attributes for processing

Parameters:

Returns:

  • (Hash)

    Span attributes



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/pigeon/tracing/messaging.rb', line 95

def self.process_span_attributes(message)
  attributes = {
    "messaging.system" => "kafka",
    "messaging.destination" => message.topic,
    "messaging.destination_kind" => "topic",
    "messaging.operation" => "process",
    "messaging.kafka.key" => message.key,
    "messaging.message_id" => message.id.to_s
  }

  # Add correlation ID if available
  attributes["messaging.correlation_id"] = message.correlation_id if message.correlation_id

  attributes
end

.publish_span_attributes(topic, payload, key, correlation_id) ⇒ Hash

Create span attributes for publishing

Parameters:

  • topic (String)

    Kafka topic

  • payload (Hash, String)

    Message payload

  • key (String, nil)

    Optional message key

  • correlation_id (String, nil)

    Optional correlation ID

Returns:

  • (Hash)

    Span attributes



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/pigeon/tracing/messaging.rb', line 47

def self.publish_span_attributes(topic, payload, key, correlation_id)
  attributes = {
    "messaging.system" => "kafka",
    "messaging.destination" => topic,
    "messaging.destination_kind" => "topic",
    "messaging.operation" => "publish"
  }

  # Add optional attributes
  attributes["messaging.message_id"] = correlation_id if correlation_id
  attributes["messaging.kafka.key"] = key if key
  attributes["messaging.message_payload_size_bytes"] = payload.bytesize if payload.respond_to?(:bytesize)

  attributes
end

.trace_batch_process(batch_size) {|span| ... } ⇒ Object

Create a span for batch processing

Parameters:

  • batch_size (Integer)

    Number of messages to process

Yields:

  • (span)

    Block to execute within the span

Yield Parameters:

  • span (OpenTelemetry::Span)

    Active span

Returns:

  • (Object)

    Result of the block



129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/pigeon/tracing/messaging.rb', line 129

def self.trace_batch_process(batch_size, &)
  return yield(nil) unless Core.available?

  # Create span attributes
  attributes = {
    "messaging.system" => "kafka",
    "messaging.operation" => "batch_process",
    "messaging.batch.size" => batch_size
  }

  # Create the span
  Core.with_span("process_batch", attributes: attributes, &)
end

.trace_process(message) {|span| ... } ⇒ Object

Create a span for processing a message

Parameters:

Yields:

  • (span)

    Block to execute within the span

Yield Parameters:

  • span (OpenTelemetry::Span)

    Active span

Returns:

  • (Object)

    Result of the block



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/pigeon/tracing/messaging.rb', line 68

def self.trace_process(message)
  return yield(nil) unless Core.available?

  # Extract trace context from message headers if available
  parent_context = message.headers ? Core.extract_context(message.headers) : nil

  # Create span attributes
  attributes = process_span_attributes(message)

  # Create the span
  Core.with_span(
    "process_message",
    attributes: attributes,
    kind: :consumer,
    parent_context: parent_context
  ) do |span|
    # Add additional attributes
    add_process_span_attributes(span, message)

    # Yield to the block with the span
    yield span
  end
end

.trace_publish(topic:, payload:, key: nil, headers: nil, correlation_id: nil) {|span, context, headers| ... } ⇒ Object

Create a span for publishing a message

Parameters:

  • topic (String)

    Kafka topic

  • payload (Hash, String)

    Message payload

  • key (String, nil) (defaults to: nil)

    Optional message key

  • headers (Hash, nil) (defaults to: nil)

    Optional message headers

  • correlation_id (String, nil) (defaults to: nil)

    Optional correlation ID

Yields:

  • (span, context, headers)

    Block to execute within the span

Yield Parameters:

  • span (OpenTelemetry::Span)

    Active span

  • context (OpenTelemetry::Context)

    Span context

  • headers (Hash)

    Headers with injected trace context

Returns:

  • (Object)

    Result of the block



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/pigeon/tracing/messaging.rb', line 18

def self.trace_publish(topic:, payload:, key: nil, headers: nil, correlation_id: nil)
  return yield(nil, nil, headers || {}) unless Core.available?

  # Create span attributes
  attributes = publish_span_attributes(topic, payload, key, correlation_id)

  # Create the span
  Core.with_span("publish_message", attributes: attributes, kind: :producer) do |span|
    # Get the current context with the span
    context = OpenTelemetry::Context.current

    # Inject trace context into headers
    trace_headers = headers ? headers.dup : {}
    Core.inject_context(trace_headers, context)

    # Add correlation ID to span
    span.add_attributes("messaging.correlation_id" => correlation_id) if correlation_id

    # Yield to the block with the span, context, and headers
    yield span, context, trace_headers
  end
end