Module: ActivePublisher
- Defined in:
- lib/active_publisher.rb,
lib/active_publisher/async.rb,
lib/active_publisher/logging.rb,
lib/active_publisher/version.rb,
lib/active_publisher/connection.rb,
lib/active_publisher/configuration.rb,
lib/active_publisher/async/in_memory_adapter.rb
Defined Under Namespace
Modules: Async, Connection, Logging Classes: Configuration
Constant Summary collapse
- VERSION =
"0.1.1"
Class Method Summary collapse
- .configuration ⇒ Object
- .configure {|configuration| ... } ⇒ Object
-
.publish(route, payload, exchange_name, options = {}) ⇒ Object
Publish a message to RabbitMQ.
-
.publish_async(route, payload, exchange_name, options = {}) ⇒ Object
Publish a message asynchronously to RabbitMQ.
- .publishing_options(route, options = {}) ⇒ Object
- .with_exchange(exchange_name) ⇒ Object
Class Method Details
.configuration ⇒ Object
16 17 18 |
# File 'lib/active_publisher.rb', line 16 def self.configuration @configuration ||= ::ActivePublisher::Configuration.new end |
.configure {|configuration| ... } ⇒ Object
20 21 22 |
# File 'lib/active_publisher.rb', line 20 def self.configure yield(configuration) if block_given? end |
.publish(route, payload, exchange_name, options = {}) ⇒ Object
Publish a message to RabbitMQ
30 31 32 33 34 |
# File 'lib/active_publisher.rb', line 30 def self.publish(route, payload, exchange_name, = {}) with_exchange(exchange_name) do |exchange| exchange.publish(payload, (route, )) end end |
.publish_async(route, payload, exchange_name, options = {}) ⇒ Object
Publish a message asynchronously to RabbitMQ.
Asynchronous is designed to do two things:
-
Introduce the idea of a durable retry should the RabbitMQ connection disconnect.
-
Provide a higher-level pattern for fire-and-forget publishing.
12 13 14 |
# File 'lib/active_publisher/async.rb', line 12 def self.publish_async(route, payload, exchange_name, = {}) ::ActivePublisher::Async.publisher_adapter.publish(route, payload, exchange_name, ) end |
.publishing_options(route, options = {}) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/active_publisher.rb', line 36 def self.(route, = {}) [:mandatory] = false unless .key(:mandatory) [:persistent] = false unless .key(:persistent) [:routing_key] = route if ::RUBY_PLATFORM == "java" = {} [:mandatory] = .delete(:mandatory) [:routing_key] = .delete(:routing_key) [:properties] = else end end |
.with_exchange(exchange_name) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/active_publisher.rb', line 52 def self.with_exchange(exchange_name) connection = ::ActivePublisher::Connection.connection channel = connection.create_channel begin channel.confirm_select if configuration.publisher_confirms exchange = channel.topic(exchange_name) yield(exchange) channel.wait_for_confirms if configuration.publisher_confirms ensure channel.close rescue nil end end |