Class: Freddy
- Inherits:
-
Object
- Object
- Freddy
- Defined in:
- lib/freddy.rb,
lib/freddy/payload.rb,
lib/freddy/tracing.rb,
lib/freddy/version.rb,
lib/freddy/adapters.rb,
lib/freddy/delivery.rb,
lib/freddy/encoding.rb,
lib/freddy/timeout_error.rb,
lib/freddy/error_response.rb,
lib/freddy/message_handler.rb,
lib/freddy/request_manager.rb,
lib/freddy/responder_handler.rb,
lib/freddy/invalid_request_error.rb,
lib/freddy/adapters/bunny_adapter.rb,
lib/freddy/sync_response_container.rb,
lib/freddy/producers/reply_producer.rb,
lib/freddy/message_handler_adapaters.rb,
lib/freddy/consumers/response_consumer.rb,
lib/freddy/consumers/tap_into_consumer.rb,
lib/freddy/consumers/respond_to_consumer.rb,
lib/freddy/producers/send_and_forget_producer.rb,
lib/freddy/producers/send_and_wait_response_producer.rb
Defined Under Namespace
Modules: Adapters, Consumers, MessageHandlerAdapters, Producers, Tracing Classes: Delivery, Encoding, ErrorResponse, InvalidRequestError, MessageHandler, Payload, RequestManager, ResponderHandler, SyncResponseContainer, TimeoutError
Constant Summary collapse
- FREDDY_TOPIC_EXCHANGE_NAME =
'freddy-topic'- DEFAULT_MAX_CONCURRENCY =
4- VERSION =
'2.1.0'
Class Method Summary collapse
-
.build(logger = Logger.new($stdout), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config) ⇒ Freddy
Creates a new freddy instance.
- .tracer ⇒ Object
Instance Method Summary collapse
-
#close ⇒ void
Closes the connection with message queue.
-
#deliver(destination, payload, options = {}) ⇒ void
Sends a message to given destination.
-
#deliver_with_response(destination, payload, options = {}) ⇒ Hash
Sends a message and waits for the response.
-
#respond_to(destination) {|message, handler| ... } ⇒ #shutdown
Listens and responds to messages.
-
#tap_into(pattern_or_patterns, options = {}) {|message| ... } ⇒ #shutdown
Listens for messages without consuming them.
Class Method Details
.build(logger = Logger.new($stdout), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config) ⇒ Freddy
Creates a new freddy instance
32 33 34 35 |
# File 'lib/freddy.rb', line 32 def self.build(logger = Logger.new($stdout), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config) connection = Adapters.determine.connect(config) new(connection, logger, max_concurrency) end |
.tracer ⇒ Object
38 39 40 |
# File 'lib/freddy.rb', line 38 def self.tracer @tracer ||= OpenTelemetry.tracer_provider.tracer('freddy', Freddy::VERSION) end |
Instance Method Details
#close ⇒ void
This method returns an undefined value.
Closes the connection with message queue
214 215 216 |
# File 'lib/freddy.rb', line 214 def close @connection.close end |
#deliver(destination, payload, options = {}) ⇒ void
This method returns an undefined value.
Sends a message to given destination
This is *send and forget* type of delivery. It sends a message to given destination and does not wait for response. This is useful when there are multiple consumers that are using #tap_into or you just do not care about the response.
161 162 163 164 165 166 167 168 169 |
# File 'lib/freddy.rb', line 161 def deliver(destination, payload, = {}) timeout = .fetch(:timeout, 0) compression_algorithm = .fetch(:compress, nil) opts = {} opts[:expiration] = (timeout * 1000).to_i if timeout.positive? opts[:content_encoding] = compression_algorithm if compression_algorithm @send_and_forget_producer.produce(destination, payload, opts) end |
#deliver_with_response(destination, payload, options = {}) ⇒ Hash
Sends a message and waits for the response
200 201 202 203 204 205 206 |
# File 'lib/freddy.rb', line 200 def deliver_with_response(destination, payload, = {}) timeout = .fetch(:timeout, 3) delete_on_timeout = .fetch(:delete_on_timeout, true) @send_and_wait_response_producer.produce destination, payload, timeout_in_seconds: timeout, delete_on_timeout: delete_on_timeout end |
#respond_to(destination) {|message, handler| ... } ⇒ #shutdown
Listens and responds to messages
This consumes messages on a given destination. It is useful for messages that have to be processed once and then a result must be sent.
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/freddy.rb', line 80 def respond_to(destination, &callback) @logger.info "Listening for requests on #{destination}" channel = @connection.create_channel(prefetch: @prefetch_buffer_size) producer = Producers::ReplyProducer.new(channel, @logger) handler_adapter_factory = MessageHandlerAdapters::Factory.new(producer) Consumers::RespondToConsumer.consume( thread_pool: Thread.pool(@prefetch_buffer_size), destination: destination, channel: channel, handler_adapter_factory: handler_adapter_factory, &callback ) end |
#tap_into(pattern_or_patterns, options = {}) {|message| ... } ⇒ #shutdown
Listens for messages without consuming them
This listens for messages on a given destination or destinations without consuming them. It is useful for general messages that two or more clients are interested.
127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/freddy.rb', line 127 def tap_into(pattern_or_patterns, = {}, &callback) @logger.debug "Tapping into messages that match #{pattern_or_patterns}" Consumers::TapIntoConsumer.consume( thread_pool: Thread.pool(@prefetch_buffer_size), patterns: Array(pattern_or_patterns), channel: @connection.create_channel(prefetch: @prefetch_buffer_size), options: , &callback ) end |