Class: Freddy
- Inherits:
-
Object
- Object
- Freddy
- Defined in:
- lib/freddy.rb,
lib/freddy/payload.rb,
lib/freddy/adapters.rb,
lib/freddy/delivery.rb,
lib/freddy/timeout_error.rb,
lib/freddy/trace_carrier.rb,
lib/freddy/error_response.rb,
lib/freddy/message_handler.rb,
lib/freddy/request_manager.rb,
lib/freddy/responder_handler.rb,
lib/freddy/invalid_request_error.rb,
lib/freddy/adapters/bunny_adapter.rb,
lib/freddy/sync_response_container.rb,
lib/freddy/producers/reply_producer.rb,
lib/freddy/message_handler_adapaters.rb,
lib/freddy/adapters/march_hare_adapter.rb,
lib/freddy/consumers/response_consumer.rb,
lib/freddy/consumers/tap_into_consumer.rb,
lib/freddy/consumers/respond_to_consumer.rb,
lib/freddy/producers/send_and_forget_producer.rb,
lib/freddy/producers/send_and_wait_response_producer.rb
Defined Under Namespace
Modules: Adapters, Consumers, MessageHandlerAdapters, Producers Classes: Delivery, ErrorResponse, InvalidRequestError, MessageHandler, Payload, RequestManager, ResponderHandler, SyncResponseContainer, TimeoutError, TraceCarrier
Constant Summary collapse
- FREDDY_TOPIC_EXCHANGE_NAME =
'freddy-topic'.freeze
- DEFAULT_MAX_CONCURRENCY =
4
Class Method Summary collapse
-
.build(logger = Logger.new(STDOUT), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config) ⇒ Freddy
Creates a new freddy instance.
-
.trace ⇒ Object
deprecated
Deprecated.
Use OpenTracing.active_span instead
-
.trace=(trace) ⇒ Object
deprecated
Deprecated.
Use OpenTracing ScopeManager instead
Instance Method Summary collapse
-
#close ⇒ void
Closes the connection with message queue.
-
#deliver(destination, payload, options = {}) ⇒ void
Sends a message to given destination.
-
#deliver_with_response(destination, payload, options = {}) ⇒ Hash
Sends a message and waits for the response.
-
#respond_to(destination) {|message, handler| ... } ⇒ #shutdown
Listens and responds to messages.
-
#tap_into(pattern, options = {}) {|message| ... } ⇒ #shutdown
Listens for messages without consuming them.
Class Method Details
.build(logger = Logger.new(STDOUT), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config) ⇒ Freddy
Creates a new freddy instance
28 29 30 31 32 33 |
# File 'lib/freddy.rb', line 28 def self.build(logger = Logger.new(STDOUT), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config) OpenTracing.global_tracer ||= OpenTracing::Tracer.new connection = Adapters.determine.connect(config) new(connection, logger, max_concurrency) end |
.trace ⇒ Object
Use OpenTracing.active_span instead
36 37 38 |
# File 'lib/freddy.rb', line 36 def self.trace OpenTracing.active_span end |
.trace=(trace) ⇒ Object
Use OpenTracing ScopeManager instead
41 42 43 44 45 |
# File 'lib/freddy.rb', line 41 def self.trace=(trace) if OpenTracing.active_span != trace OpenTracing.scope_manager.activate(trace) end end |
Instance Method Details
#close ⇒ void
This method returns an undefined value.
Closes the connection with message queue
208 209 210 |
# File 'lib/freddy.rb', line 208 def close @connection.close end |
#deliver(destination, payload, options = {}) ⇒ void
This method returns an undefined value.
Sends a message to given destination
This is *send and forget* type of delivery. It sends a message to given destination and does not wait for response. This is useful when there are multiple consumers that are using #tap_into or you just do not care about the response.
156 157 158 159 160 161 162 |
# File 'lib/freddy.rb', line 156 def deliver(destination, payload, = {}) timeout = .fetch(:timeout, 0) opts = {} opts[:expiration] = (timeout * 1000).to_i if timeout > 0 @send_and_forget_producer.produce(destination, payload, opts) end |
#deliver_with_response(destination, payload, options = {}) ⇒ Hash
Sends a message and waits for the response
193 194 195 196 197 198 199 200 |
# File 'lib/freddy.rb', line 193 def deliver_with_response(destination, payload, = {}) timeout = .fetch(:timeout, 3) delete_on_timeout = .fetch(:delete_on_timeout, true) @send_and_wait_response_producer.produce destination, payload, { timeout_in_seconds: timeout, delete_on_timeout: delete_on_timeout } end |
#respond_to(destination) {|message, handler| ... } ⇒ #shutdown
Listens and responds to messages
This consumes messages on a given destination. It is useful for messages that have to be processed once and then a result must be sent.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/freddy.rb', line 85 def respond_to(destination, &callback) @logger.info "Listening for requests on #{destination}" channel = @connection.create_channel(prefetch: @prefetch_buffer_size) producer = Producers::ReplyProducer.new(channel, @logger) handler_adapter_factory = MessageHandlerAdapters::Factory.new(producer) Consumers::RespondToConsumer.consume( thread_pool: Thread.pool(@prefetch_buffer_size), destination: destination, channel: channel, handler_adapter_factory: handler_adapter_factory, &callback ) end |
#tap_into(pattern, options = {}) {|message| ... } ⇒ #shutdown
Listens for messages without consuming them
This listens for messages on a given destination or destinations without consuming them. It is useful for general messages that two or more clients are interested.
123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/freddy.rb', line 123 def tap_into(pattern, = {}, &callback) @logger.debug "Tapping into messages that match #{pattern}" Consumers::TapIntoConsumer.consume( thread_pool: Thread.pool(@prefetch_buffer_size), pattern: pattern, channel: @connection.create_channel(prefetch: @prefetch_buffer_size), options: , &callback ) end |