Class: Freddy
- Inherits:
-
Object
- Object
- Freddy
- Defined in:
- lib/freddy.rb,
lib/freddy/payload.rb,
lib/freddy/tracing.rb,
lib/freddy/version.rb,
lib/freddy/adapters.rb,
lib/freddy/delivery.rb,
lib/freddy/encoding.rb,
lib/freddy/timeout_error.rb,
lib/freddy/error_response.rb,
lib/freddy/message_handler.rb,
lib/freddy/request_manager.rb,
lib/freddy/responder_handler.rb,
lib/freddy/invalid_request_error.rb,
lib/freddy/adapters/bunny_adapter.rb,
lib/freddy/sync_response_container.rb,
lib/freddy/producers/reply_producer.rb,
lib/freddy/message_handler_adapaters.rb,
lib/freddy/consumers/response_consumer.rb,
lib/freddy/consumers/tap_into_consumer.rb,
lib/freddy/consumers/respond_to_consumer.rb,
lib/freddy/producers/send_and_forget_producer.rb,
lib/freddy/producers/send_and_wait_response_producer.rb
Defined Under Namespace
Modules: Adapters, Consumers, MessageHandlerAdapters, Producers, Tracing Classes: Delivery, Encoding, ErrorResponse, InvalidRequestError, MessageHandler, Payload, RequestManager, ResponderHandler, SyncResponseContainer, TimeoutError
Constant Summary collapse
- FREDDY_TOPIC_EXCHANGE_NAME =
'freddy-topic'- DEFAULT_MAX_CONCURRENCY =
4- VERSION =
'2.8.0'
Class Method Summary collapse
-
.build(logger = Logger.new($stdout), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config) ⇒ Freddy
Creates a new freddy instance.
- .tracer ⇒ Object
Instance Method Summary collapse
-
#close ⇒ void
Closes the connection with message queue.
-
#deliver(destination, payload, options = {}) ⇒ void
Sends a message to given destination.
-
#deliver_with_response(destination, payload, options = {}) ⇒ Hash
Sends a message and waits for the response.
-
#respond_to(destination) {|message, handler| ... } ⇒ #shutdown
Listens and responds to messages.
-
#tap_into(pattern_or_patterns, options = {}) {|message| ... } ⇒ #shutdown
Listens for messages without consuming them.
Class Method Details
.build(logger = Logger.new($stdout), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config) ⇒ Freddy
Creates a new freddy instance
32 33 34 35 |
# File 'lib/freddy.rb', line 32 def self.build(logger = Logger.new($stdout), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config) connection = Adapters.determine.connect(config) new(connection, logger, max_concurrency) end |
Instance Method Details
#close ⇒ void
This method returns an undefined value.
Closes the connection with message queue
229 230 231 |
# File 'lib/freddy.rb', line 229 def close @connection.close end |
#deliver(destination, payload, options = {}) ⇒ void
This method returns an undefined value.
Sends a message to given destination
This is *send and forget* type of delivery. It sends a message to given destination and does not wait for response. This is useful when there are multiple consumers that are using #tap_into or you just do not care about the response.
175 176 177 178 179 180 181 182 183 184 |
# File 'lib/freddy.rb', line 175 def deliver(destination, payload, = {}) timeout = .fetch(:timeout, 0) compression_algorithm = .fetch(:compress, nil) opts = {} opts[:expiration] = (timeout * 1000).to_i if timeout.positive? opts[:content_encoding] = compression_algorithm if compression_algorithm opts[:headers] = [:headers] if [:headers] @send_and_forget_producer.produce(destination, payload, opts) end |
#deliver_with_response(destination, payload, options = {}) ⇒ Hash
Sends a message and waits for the response
215 216 217 218 219 220 221 |
# File 'lib/freddy.rb', line 215 def deliver_with_response(destination, payload, = {}) timeout = .fetch(:timeout, 3) delete_on_timeout = .fetch(:delete_on_timeout, true) @send_and_wait_response_producer.produce destination, payload, timeout_in_seconds: timeout, delete_on_timeout: delete_on_timeout end |
#respond_to(destination) {|message, handler| ... } ⇒ #shutdown
Listens and responds to messages
This consumes messages on a given destination. It is useful for messages that have to be processed once and then a result must be sent.
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/freddy.rb', line 80 def respond_to(destination, &callback) @logger.info "Listening for requests on #{destination}" channel = @connection.create_channel(prefetch: @prefetch_buffer_size) producer = Producers::ReplyProducer.new(channel, @logger) handler_adapter_factory = MessageHandlerAdapters::Factory.new(producer) Consumers::RespondToConsumer.consume( **{ thread_pool: Concurrent::FixedThreadPool.new(@prefetch_buffer_size), destination: destination, channel: channel, handler_adapter_factory: handler_adapter_factory }, &callback ) end |
#tap_into(pattern_or_patterns, options = {}) {|message| ... } ⇒ #shutdown
Listens for messages without consuming them
This listens for messages on a given destination or destinations without consuming them. It is useful for general messages that two or more clients are interested.
137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/freddy.rb', line 137 def tap_into(pattern_or_patterns, = {}, &callback) @logger.debug "Tapping into messages that match #{pattern_or_patterns}" Consumers::TapIntoConsumer.consume( **{ thread_pool: Concurrent::FixedThreadPool.new(@prefetch_buffer_size), patterns: Array(pattern_or_patterns), channel: @connection.create_channel(prefetch: @prefetch_buffer_size), options: }, &callback ) end |