Class: Pigeon::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/pigeon/publisher.rb

Overview

Publisher class for storing messages in the outbox

Defined Under Namespace

Classes: OutboxMessage

Instance Method Summary collapse

Instance Method Details

#publish(topic:, payload:, key: nil, headers: nil, sync: false, partition: nil) ⇒ OutboxMessage

Publish a message to Kafka via the outbox pattern

Parameters:

  • topic (String)

    Kafka topic

  • payload (Hash, String)

    Message payload

  • key (String, nil) (defaults to: nil)

    Optional message key

  • headers (Hash, nil) (defaults to: nil)

    Optional message headers

  • sync (Boolean) (defaults to: false)

    Whether to attempt immediate publishing

  • partition (Integer, nil) (defaults to: nil)

    Optional specific partition

Returns:



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/pigeon/publisher.rb', line 23

def publish(topic:, payload:, key: nil, headers: nil, sync: false, partition: nil)
  # This is a placeholder implementation
  # The actual implementation will be added in task 3.1
  Pigeon.config.logger.info("Message published to topic: #{topic}")

  # Prepare the message for Karafka
  message_payload = payload.is_a?(String) ? payload : payload.to_json

  # If sync is true, attempt to publish immediately using Karafka
  if sync
    begin
      message_options = { topic: topic }
      message_options[:key] = key if key
      message_options[:headers] = headers if headers
      message_options[:partition] = partition if partition

      # Use Karafka producer to send the message
      Pigeon.karafka_producer.produce_sync(message_payload, **message_options)

      Pigeon.config.logger.info("Message published synchronously to topic: #{topic}")
    rescue StandardError => e
      Pigeon.config.logger.error("Failed to publish message synchronously: #{e.message}")
      # Continue with storing in outbox
    end
  end

  # Return a mock outbox message for now
  # In the actual implementation, this will be stored in the database
  OutboxMessage.new(
    id: SecureRandom.uuid,
    topic: topic,
    key: key,
    headers: headers,
    message_partition: partition,
    payload: payload,
    status: sync ? "processed" : "pending",
    created_at: Time.now
  )
end