Class: Pigeon::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/pigeon/publisher.rb

Overview

Publisher class for storing messages in the outbox

Defined Under Namespace

Classes: MockOutboxMessage

Instance Method Summary collapse

Constructor Details

#initializePublisher

Initialize a new publisher



22
23
24
# File 'lib/pigeon/publisher.rb', line 22

def initialize
  @metrics = Pigeon.config.metrics_collector || Pigeon::Metrics::Collector.new
end

Instance Method Details

#publish(topic:, payload:, **options) ⇒ Pigeon::Models::OutboxMessage

Publish a message to Kafka via the outbox pattern

Parameters:

  • topic (String)

    Kafka topic

  • payload (Hash, String)

    Message payload

  • key (String, nil)

    Optional message key

  • headers (Hash, nil)

    Optional message headers

  • sync (Boolean)

    Whether to attempt immediate publishing

  • partition (Integer, nil)

    Optional specific partition

  • correlation_id (String, nil)

    Optional correlation ID for tracing

  • max_retries (Integer, nil)

    Optional override for max retries

  • schema_name (String, Symbol, nil)

    Optional schema name for validation

  • schema (Hash, String, nil)

    Optional schema for validation

  • encrypt (Boolean)

    Whether to encrypt the payload

  • encryption_key (String, nil)

    Optional encryption key

  • sensitive_fields (Array<String, Symbol>, nil)

    Optional sensitive fields to mask in logs

Returns:



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/pigeon/publisher.rb', line 41

def publish(topic:, payload:, **options)
  Pigeon::Tracing.trace_publish(
    topic: topic,
    payload: payload,
    key: options[:key],
    headers: options[:headers],
    correlation_id: options[:correlation_id]
  ) do |span, _context, trace_headers|
    start_time = Time.now
    correlation_id = prepare_publish_context(topic, payload, options, span)
    serialized_payload = prepare_payload(payload, options)
    merged_headers = merge_trace_headers(options[:headers], trace_headers)

    message_attributes = build_message_attributes(
      topic: topic,
      key: options[:key],
      headers: merged_headers,
      partition: options[:partition],
      payload: serialized_payload,
      correlation_id: correlation_id,
      max_retries: options[:max_retries]
    )

    outbox_message = create_outbox_message_in_transaction(message_attributes)
    handle_publish_result(outbox_message, serialized_payload, topic, start_time, span, options)

    outbox_message
  end
end

#publish_direct(topic:, payload:, **options) ⇒ Boolean

Publish a message directly to Kafka without using the outbox pattern This is useful for non-critical messages where at-least-once delivery is not required

Parameters:

  • topic (String)

    Kafka topic

  • payload (Hash, String)

    Message payload

  • key (String, nil)

    Optional message key

  • headers (Hash, nil)

    Optional message headers

  • partition (Integer, nil)

    Optional specific partition

  • schema_name (String, Symbol, nil)

    Optional schema name for validation

  • schema (Hash, String, nil)

    Optional schema for validation

  • encrypt (Boolean)

    Whether to encrypt the payload

  • encryption_key (String, nil)

    Optional encryption key

  • sensitive_fields (Array<String, Symbol>, nil)

    Optional sensitive fields to mask in logs

  • correlation_id (String, nil)

    Optional correlation ID for tracing

Returns:

  • (Boolean)

    Whether the publish was successful



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/pigeon/publisher.rb', line 85

def publish_direct(topic:, payload:, **options)
  Pigeon::Tracing.trace_publish(
    topic: topic,
    payload: payload,
    key: options[:key],
    headers: options[:headers],
    correlation_id: options[:correlation_id]
  ) do |span, _context, trace_headers|
    execute_direct_publish(topic, payload, options, span, trace_headers)
  end
rescue Pigeon::Serializer::ValidationError, ArgumentError => e
  handle_validation_error(e, topic)
rescue StandardError => e
  handle_direct_publish_error(e, topic)
end