Method: Bunny::Channel#basic_publish

Defined in:
lib/bunny/channel.rb

#basic_publish(payload, exchange, routing_key, opts = {}) ⇒ Bunny::Channel

Publishes a message using basic.publish AMQP 0.9.1 method.

Options Hash (opts):

  • :persistent (Boolean)

    Should the message be persisted to disk?

  • :mandatory (Boolean)

    Should the message be returned if it cannot be routed to any queue?

  • :timestamp (Integer)

    A timestamp associated with this message

  • :expiration (Integer)

    Expiration time after which the message will be deleted

  • :type (String)

    Message type, e.g. what type of event or command this message represents. Can be any string

  • :reply_to (String)

    Queue name other apps should send the response to

  • :content_type (String)

    Message content type (e.g. application/json)

  • :content_encoding (String)

    Message content encoding (e.g. gzip)

  • :correlation_id (String)

    Message correlated to this one, e.g. what request this message is a reply for

  • :priority (Integer)

    Message priority, 0 to 9. Not used by RabbitMQ, only applications

  • :message_id (String)

    Any message identifier

  • :user_id (String)

    Optional user ID. Verified by RabbitMQ against the actual connection username

  • :app_id (String)

    Optional application ID

Raises:

  • (ArgumentError)


644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
# File 'lib/bunny/channel.rb', line 644

def basic_publish(payload, exchange, routing_key, opts = {})
  raise_if_no_longer_open!
  raise ArgumentError, "routing key cannot be longer than #{SHORTSTR_LIMIT} characters" if routing_key && routing_key.size > SHORTSTR_LIMIT

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end

  mode = if opts.fetch(:persistent, true)
           2
         else
           1
         end

  opts[:delivery_mode] ||= mode
  opts[:content_type]  ||= DEFAULT_CONTENT_TYPE
  opts[:priority]      ||= 0

  if @next_publish_seq_no > 0
    @unconfirmed_set_mutex.synchronize do
      @unconfirmed_set.add(@next_publish_seq_no)
      @next_publish_seq_no += 1
    end
  end

  frames = AMQ::Protocol::Basic::Publish.encode(@id,
    payload,
    opts,
    exchange_name,
    routing_key,
    opts[:mandatory],
    false,
    @connection.frame_max)
  @connection.send_frameset(frames, self)

  self
end