Class: Freddy
- Inherits:
-
Object
- Object
- Freddy
- Defined in:
- lib/freddy.rb,
lib/freddy/payload.rb,
lib/freddy/adapters.rb,
lib/freddy/delivery.rb,
lib/freddy/timeout_error.rb,
lib/freddy/trace_carrier.rb,
lib/freddy/error_response.rb,
lib/freddy/message_handler.rb,
lib/freddy/request_manager.rb,
lib/freddy/responder_handler.rb,
lib/freddy/invalid_request_error.rb,
lib/freddy/adapters/bunny_adapter.rb,
lib/freddy/sync_response_container.rb,
lib/freddy/producers/reply_producer.rb,
lib/freddy/message_handler_adapaters.rb,
lib/freddy/adapters/march_hare_adapter.rb,
lib/freddy/consumers/response_consumer.rb,
lib/freddy/consumers/tap_into_consumer.rb,
lib/freddy/consumers/respond_to_consumer.rb,
lib/freddy/producers/send_and_forget_producer.rb,
lib/freddy/producers/send_and_wait_response_producer.rb
Defined Under Namespace
Modules: Adapters, Consumers, MessageHandlerAdapters, Producers Classes: Delivery, ErrorResponse, InvalidRequestError, MessageHandler, Payload, RequestManager, ResponderHandler, SyncResponseContainer, TimeoutError, TraceCarrier
Constant Summary collapse
- FREDDY_TOPIC_EXCHANGE_NAME =
'freddy-topic'.freeze
- DEFAULT_MAX_CONCURRENCY =
4
Class Method Summary collapse
-
.build(logger = Logger.new(STDOUT), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config) ⇒ Freddy
Creates a new freddy instance.
- .trace ⇒ Object
- .trace=(trace) ⇒ Object
Instance Method Summary collapse
-
#close ⇒ void
Closes the connection with message queue.
-
#deliver(destination, payload, options = {}) ⇒ void
Sends a message to given destination.
-
#deliver_with_response(destination, payload, options = {}) ⇒ Hash
Sends a message and waits for the response.
-
#respond_to(destination) {|message, handler| ... } ⇒ #shutdown
Listens and responds to messages.
-
#tap_into(pattern, options = {}) {|message| ... } ⇒ #shutdown
Listens for messages without consuming them.
Class Method Details
.build(logger = Logger.new(STDOUT), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config) ⇒ Freddy
Creates a new freddy instance
28 29 30 31 32 33 |
# File 'lib/freddy.rb', line 28 def self.build(logger = Logger.new(STDOUT), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config) OpenTracing.global_tracer ||= OpenTracing::Tracer.new connection = Adapters.determine.connect(config) new(connection, logger, max_concurrency) end |
.trace ⇒ Object
35 36 37 |
# File 'lib/freddy.rb', line 35 def self.trace Thread.current[:freddy_trace] end |
.trace=(trace) ⇒ Object
39 40 41 |
# File 'lib/freddy.rb', line 39 def self.trace=(trace) Thread.current[:freddy_trace] = trace end |
Instance Method Details
#close ⇒ void
This method returns an undefined value.
Closes the connection with message queue
204 205 206 |
# File 'lib/freddy.rb', line 204 def close @connection.close end |
#deliver(destination, payload, options = {}) ⇒ void
This method returns an undefined value.
Sends a message to given destination
This is *send and forget* type of delivery. It sends a message to given destination and does not wait for response. This is useful when there are multiple consumers that are using #tap_into or you just do not care about the response.
152 153 154 155 156 157 158 |
# File 'lib/freddy.rb', line 152 def deliver(destination, payload, = {}) timeout = .fetch(:timeout, 0) opts = {} opts[:expiration] = (timeout * 1000).to_i if timeout > 0 @send_and_forget_producer.produce(destination, payload, opts) end |
#deliver_with_response(destination, payload, options = {}) ⇒ Hash
Sends a message and waits for the response
189 190 191 192 193 194 195 196 |
# File 'lib/freddy.rb', line 189 def deliver_with_response(destination, payload, = {}) timeout = .fetch(:timeout, 3) delete_on_timeout = .fetch(:delete_on_timeout, true) @send_and_wait_response_producer.produce destination, payload, { timeout_in_seconds: timeout, delete_on_timeout: delete_on_timeout } end |
#respond_to(destination) {|message, handler| ... } ⇒ #shutdown
Listens and responds to messages
This consumes messages on a given destination. It is useful for messages that have to be processed once and then a result must be sent.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/freddy.rb', line 81 def respond_to(destination, &callback) @logger.info "Listening for requests on #{destination}" channel = @connection.create_channel(prefetch: @prefetch_buffer_size) producer = Producers::ReplyProducer.new(channel, @logger) handler_adapter_factory = MessageHandlerAdapters::Factory.new(producer) Consumers::RespondToConsumer.consume( thread_pool: Thread.pool(@prefetch_buffer_size), destination: destination, channel: channel, handler_adapter_factory: handler_adapter_factory, &callback ) end |
#tap_into(pattern, options = {}) {|message| ... } ⇒ #shutdown
Listens for messages without consuming them
This listens for messages on a given destination or destinations without consuming them. It is useful for general messages that two or more clients are interested.
119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/freddy.rb', line 119 def tap_into(pattern, = {}, &callback) @logger.debug "Tapping into messages that match #{pattern}" Consumers::TapIntoConsumer.consume( thread_pool: Thread.pool(@prefetch_buffer_size), pattern: pattern, channel: @connection.create_channel(prefetch: @prefetch_buffer_size), options: , &callback ) end |