Class: SongkickQueue::Producer

Inherits:
Object
  • Object
show all
Defined in:
lib/songkick_queue/producer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeProducer

Returns a new instance of Producer.



5
6
7
8
9
# File 'lib/songkick_queue/producer.rb', line 5

def initialize
  @client = Client.new
  @reconnect_attempts = 0
  @publish_reconnect_delay = 5.0
end

Instance Attribute Details

#reconnect_attemptsObject

Returns the value of attribute reconnect_attempts.



3
4
5
# File 'lib/songkick_queue/producer.rb', line 3

def reconnect_attempts
  @reconnect_attempts
end

Instance Method Details

#publish(queue_name, payload, options = {}) ⇒ Bunny::Exchange

Serializes the given message and publishes it to the default RabbitMQ exchange

Parameters:

  • queue_name (String)

    to publish to

  • message (#to_json)

    to serialize and enqueue

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :message_id (String)

    to pass through to the consumer (will be logged)

  • :produced_at (String)

    time when the message was created, ISO8601 formatted

Returns:

  • (Bunny::Exchange)

Raises:



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/songkick_queue/producer.rb', line 21

def publish(queue_name, payload, options = {})
  message_id = options.fetch(:message_id) { SecureRandom.hex(6) }
  produced_at = options.fetch(:produced_at) { Time.now.utc.iso8601 }

  message = {
    message_id: message_id,
    produced_at: produced_at,
    payload: payload
  }

  message = JSON.generate(message)

  exchange = client
    .default_exchange
    .publish(message, routing_key: String(queue_name))

  logger.info "Published message #{message_id} to '#{queue_name}' at #{produced_at}"

  exchange
rescue Bunny::ConnectionClosedError
  self.reconnect_attempts += 1

  if (reconnect_attempts > config.max_reconnect_attempts)
    fail TooManyReconnectAttemptsError, "Attempted to reconnect more than " +
      "#{config.max_reconnect_attempts} times"
  end

  logger.info "Attempting to reconnect to RabbitMQ, attempt #{reconnect_attempts} " +
    "of #{config.max_reconnect_attempts}"

  wait_for_bunny_session_to_reconnect

  retry
end