Module: Pigeon::Tracing::Messaging
- Defined in:
- lib/pigeon/tracing/messaging.rb
Overview
Messaging-specific tracing functionality
Class Method Summary collapse
-
.add_process_span_attributes(span, message) ⇒ void
Add additional attributes to a process span.
-
.process_span_attributes(message) ⇒ Hash
Create span attributes for processing.
-
.publish_span_attributes(topic, payload, key, correlation_id) ⇒ Hash
Create span attributes for publishing.
-
.trace_batch_process(batch_size) {|span| ... } ⇒ Object
Create a span for batch processing.
-
.trace_process(message) {|span| ... } ⇒ Object
Create a span for processing a message.
-
.trace_publish(topic:, payload:, key: nil, headers: nil, correlation_id: nil) {|span, context, headers| ... } ⇒ Object
Create a span for publishing a message.
Class Method Details
.add_process_span_attributes(span, message) ⇒ void
This method returns an undefined value.
Add additional attributes to a process span
115 116 117 118 119 120 121 122 |
# File 'lib/pigeon/tracing/messaging.rb', line 115 def self.add_process_span_attributes(span, ) return unless span && .partition span.add_attributes({ "messaging.kafka.partition" => .partition, "messaging.kafka.message.retry_count" => .retry_count }) end |
.process_span_attributes(message) ⇒ Hash
Create span attributes for processing
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/pigeon/tracing/messaging.rb', line 95 def self.process_span_attributes() attributes = { "messaging.system" => "kafka", "messaging.destination" => .topic, "messaging.destination_kind" => "topic", "messaging.operation" => "process", "messaging.kafka.key" => .key, "messaging.message_id" => .id.to_s } # Add correlation ID if available attributes["messaging.correlation_id"] = .correlation_id if .correlation_id attributes end |
.publish_span_attributes(topic, payload, key, correlation_id) ⇒ Hash
Create span attributes for publishing
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/pigeon/tracing/messaging.rb', line 47 def self.publish_span_attributes(topic, payload, key, correlation_id) attributes = { "messaging.system" => "kafka", "messaging.destination" => topic, "messaging.destination_kind" => "topic", "messaging.operation" => "publish" } # Add optional attributes attributes["messaging.message_id"] = correlation_id if correlation_id attributes["messaging.kafka.key"] = key if key attributes["messaging.message_payload_size_bytes"] = payload.bytesize if payload.respond_to?(:bytesize) attributes end |
.trace_batch_process(batch_size) {|span| ... } ⇒ Object
Create a span for batch processing
129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/pigeon/tracing/messaging.rb', line 129 def self.trace_batch_process(batch_size, &) return yield(nil) unless Core.available? # Create span attributes attributes = { "messaging.system" => "kafka", "messaging.operation" => "batch_process", "messaging.batch.size" => batch_size } # Create the span Core.with_span("process_batch", attributes: attributes, &) end |
.trace_process(message) {|span| ... } ⇒ Object
Create a span for processing a message
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/pigeon/tracing/messaging.rb', line 68 def self.trace_process() return yield(nil) unless Core.available? # Extract trace context from message headers if available parent_context = .headers ? Core.extract_context(.headers) : nil # Create span attributes attributes = process_span_attributes() # Create the span Core.with_span( "process_message", attributes: attributes, kind: :consumer, parent_context: parent_context ) do |span| # Add additional attributes add_process_span_attributes(span, ) # Yield to the block with the span yield span end end |
.trace_publish(topic:, payload:, key: nil, headers: nil, correlation_id: nil) {|span, context, headers| ... } ⇒ Object
Create a span for publishing a message
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/pigeon/tracing/messaging.rb', line 18 def self.trace_publish(topic:, payload:, key: nil, headers: nil, correlation_id: nil) return yield(nil, nil, headers || {}) unless Core.available? # Create span attributes attributes = publish_span_attributes(topic, payload, key, correlation_id) # Create the span Core.with_span("publish_message", attributes: attributes, kind: :producer) do |span| # Get the current context with the span context = OpenTelemetry::Context.current # Inject trace context into headers trace_headers = headers ? headers.dup : {} Core.inject_context(trace_headers, context) # Add correlation ID to span span.add_attributes("messaging.correlation_id" => correlation_id) if correlation_id # Yield to the block with the span, context, and headers yield span, context, trace_headers end end |