Class: Freddy
- Inherits:
-
Object
- Object
- Freddy
- Defined in:
- lib/freddy.rb,
lib/freddy/payload.rb,
lib/freddy/adapters.rb,
lib/freddy/delivery.rb,
lib/freddy/consumers.rb,
lib/freddy/producers.rb,
lib/freddy/timeout_error.rb,
lib/freddy/error_response.rb,
lib/freddy/message_handler.rb,
lib/freddy/request_manager.rb,
lib/freddy/responder_handler.rb,
lib/freddy/invalid_request_error.rb,
lib/freddy/adapters/bunny_adapter.rb,
lib/freddy/sync_response_container.rb,
lib/freddy/producers/reply_producer.rb,
lib/freddy/message_handler_adapaters.rb,
lib/freddy/adapters/march_hare_adapter.rb,
lib/freddy/consumers/response_consumer.rb,
lib/freddy/consumers/tap_into_consumer.rb,
lib/freddy/consumers/respond_to_consumer.rb,
lib/freddy/producers/send_and_forget_producer.rb,
lib/freddy/producers/send_and_wait_response_producer.rb
Defined Under Namespace
Modules: Adapters, Consumers, MessageHandlerAdapters, Producers Classes: Delivery, ErrorResponse, InvalidRequestError, MessageHandler, Payload, RequestManager, ResponderHandler, SyncResponseContainer, TimeoutError
Constant Summary collapse
- FREDDY_TOPIC_EXCHANGE_NAME =
'freddy-topic'.freeze
- DEFAULT_MAX_CONCURRENCY =
4
Class Method Summary collapse
-
.build(logger = Logger.new(STDOUT), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config) ⇒ Freddy
Creates a new freddy instance.
Instance Method Summary collapse
-
#close ⇒ void
Closes the connection with message queue.
-
#deliver(destination, payload, options = {}) ⇒ void
Sends a message to given destination.
-
#deliver_with_response(destination, payload, options = {}) ⇒ Hash
Sends a message and waits for the response.
-
#respond_to(destination) {|message, handler| ... } ⇒ #shutdown
Listens and responds to messages.
-
#tap_into(pattern, options = {}) {|message| ... } ⇒ #shutdown
Listens for messages without consuming them.
Class Method Details
.build(logger = Logger.new(STDOUT), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config) ⇒ Freddy
Creates a new freddy instance
27 28 29 30 31 |
# File 'lib/freddy.rb', line 27 def self.build(logger = Logger.new(STDOUT), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config) connection = Adapters.determine.connect(config) new(connection, logger, max_concurrency) end |
Instance Method Details
#close ⇒ void
This method returns an undefined value.
Closes the connection with message queue
196 197 198 |
# File 'lib/freddy.rb', line 196 def close @connection.close end |
#deliver(destination, payload, options = {}) ⇒ void
This method returns an undefined value.
Sends a message to given destination
This is *send and forget* type of delivery. It sends a message to given destination and does not wait for response. This is useful when there are multiple consumers that are using #tap_into or you just do not care about the response.
144 145 146 147 148 149 150 |
# File 'lib/freddy.rb', line 144 def deliver(destination, payload, = {}) timeout = .fetch(:timeout, 0) opts = {} opts[:expiration] = (timeout * 1000).to_i if timeout > 0 @send_and_forget_producer.produce(destination, payload, opts) end |
#deliver_with_response(destination, payload, options = {}) ⇒ Hash
Sends a message and waits for the response
181 182 183 184 185 186 187 188 |
# File 'lib/freddy.rb', line 181 def deliver_with_response(destination, payload, = {}) timeout = .fetch(:timeout, 3) delete_on_timeout = .fetch(:delete_on_timeout, true) @send_and_wait_response_producer.produce destination, payload, { timeout_in_seconds: timeout, delete_on_timeout: delete_on_timeout } end |
#respond_to(destination) {|message, handler| ... } ⇒ #shutdown
Listens and responds to messages
This consumes messages on a given destination. It is useful for messages that have to be processed once and then a result must be sent.
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/freddy.rb', line 71 def respond_to(destination, &callback) @logger.info "Listening for requests on #{destination}" channel = @connection.create_channel(prefetch: @prefetch_buffer_size) producer = Producers::ReplyProducer.new(channel, @logger) handler_adapter_factory = MessageHandlerAdapters::Factory.new(producer) Consumers::RespondToConsumer.consume( logger: @logger, thread_pool: Thread.pool(@prefetch_buffer_size), destination: destination, channel: channel, handler_adapter_factory: handler_adapter_factory, &callback ) end |
#tap_into(pattern, options = {}) {|message| ... } ⇒ #shutdown
Listens for messages without consuming them
This listens for messages on a given destination or destinations without consuming them. It is useful for general messages that two or more clients are interested.
110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/freddy.rb', line 110 def tap_into(pattern, = {}, &callback) @logger.debug "Tapping into messages that match #{pattern}" Consumers::TapIntoConsumer.consume( logger: @logger, thread_pool: Thread.pool(@prefetch_buffer_size), pattern: pattern, channel: @connection.create_channel(prefetch: @prefetch_buffer_size), options: , &callback ) end |