Module: ActivePublisher

Defined in:
lib/active_publisher.rb,
lib/active_publisher/async.rb,
lib/active_publisher/logging.rb,
lib/active_publisher/version.rb,
lib/active_publisher/connection.rb,
lib/active_publisher/configuration.rb,
lib/active_publisher/async/in_memory_adapter.rb

Defined Under Namespace

Modules: Async, Connection, Logging Classes: Configuration

Constant Summary collapse

VERSION =
"0.1.1"

Class Method Summary collapse

Class Method Details

.configurationObject



16
17
18
# File 'lib/active_publisher.rb', line 16

def self.configuration
  @configuration ||= ::ActivePublisher::Configuration.new
end

.configure {|configuration| ... } ⇒ Object

Yields:



20
21
22
# File 'lib/active_publisher.rb', line 20

def self.configure
  yield(configuration) if block_given?
end

.publish(route, payload, exchange_name, options = {}) ⇒ Object

Publish a message to RabbitMQ

Parameters:

  • route (String)

    The routing key to use for this message.

  • payload (String)

    The message you are sending. Should already be encoded as a string.

  • exchange (String)

    The exchange you want to publish to.

  • options (Hash) (defaults to: {})

    hash to set message parameters (e.g. headers)



30
31
32
33
34
# File 'lib/active_publisher.rb', line 30

def self.publish(route, payload, exchange_name, options = {})
  with_exchange(exchange_name) do |exchange|
    exchange.publish(payload, publishing_options(route, options))
  end
end

.publish_async(route, payload, exchange_name, options = {}) ⇒ Object

Publish a message asynchronously to RabbitMQ.

Asynchronous is designed to do two things:

  1. Introduce the idea of a durable retry should the RabbitMQ connection disconnect.

  2. Provide a higher-level pattern for fire-and-forget publishing.

Parameters:

  • route (String)

    The routing key to use for this message.

  • payload (String)

    The message you are sending. Should already be encoded as a string.

  • exchange (String)

    The exchange you want to publish to.

  • options (Hash) (defaults to: {})

    hash to set message parameters (e.g. headers).



12
13
14
# File 'lib/active_publisher/async.rb', line 12

def self.publish_async(route, payload, exchange_name, options = {})
  ::ActivePublisher::Async.publisher_adapter.publish(route, payload, exchange_name, options)
end

.publishing_options(route, options = {}) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/active_publisher.rb', line 36

def self.publishing_options(route, options = {})
  options[:mandatory] = false unless options.key(:mandatory)
  options[:persistent] = false unless options.key(:persistent)
  options[:routing_key] = route

  if ::RUBY_PLATFORM == "java"
    java_options = {}
    java_options[:mandatory]   = options.delete(:mandatory)
    java_options[:routing_key] = options.delete(:routing_key)
    java_options[:properties]  = options
    java_options
  else
    options
  end
end

.with_exchange(exchange_name) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/active_publisher.rb', line 52

def self.with_exchange(exchange_name)
  connection = ::ActivePublisher::Connection.connection
  channel = connection.create_channel
  begin
    channel.confirm_select if configuration.publisher_confirms
    exchange = channel.topic(exchange_name)
    yield(exchange)
    channel.wait_for_confirms if configuration.publisher_confirms
  ensure
    channel.close rescue nil
  end
end