Class: Freddy

Inherits:
Object
  • Object
show all
Defined in:
lib/freddy.rb,
lib/freddy/payload.rb,
lib/freddy/tracing.rb,
lib/freddy/version.rb,
lib/freddy/adapters.rb,
lib/freddy/delivery.rb,
lib/freddy/encoding.rb,
lib/freddy/timeout_error.rb,
lib/freddy/error_response.rb,
lib/freddy/message_handler.rb,
lib/freddy/request_manager.rb,
lib/freddy/responder_handler.rb,
lib/freddy/invalid_request_error.rb,
lib/freddy/adapters/bunny_adapter.rb,
lib/freddy/sync_response_container.rb,
lib/freddy/producers/reply_producer.rb,
lib/freddy/message_handler_adapaters.rb,
lib/freddy/consumers/response_consumer.rb,
lib/freddy/consumers/tap_into_consumer.rb,
lib/freddy/consumers/respond_to_consumer.rb,
lib/freddy/producers/send_and_forget_producer.rb,
lib/freddy/producers/send_and_wait_response_producer.rb

Defined Under Namespace

Modules: Adapters, Consumers, MessageHandlerAdapters, Producers, Tracing Classes: Delivery, Encoding, ErrorResponse, InvalidRequestError, MessageHandler, Payload, RequestManager, ResponderHandler, SyncResponseContainer, TimeoutError

Constant Summary collapse

FREDDY_TOPIC_EXCHANGE_NAME =
'freddy-topic'
DEFAULT_MAX_CONCURRENCY =
4
VERSION =
'2.8.0'

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.build(logger = Logger.new($stdout), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config) ⇒ Freddy

Creates a new freddy instance

Examples:

Freddy.build(Logger.new($stdout), user: 'thumper', pass: 'howdy')

Parameters:

  • logger (Logger) (defaults to: Logger.new($stdout))

    instance of a logger, defaults to the STDOUT logger

  • config (Hash)

    rabbitmq connection information

Options Hash (**config):

  • :host (String) — default: 'localhost'
  • :port (Integer) — default: 5672
  • :user (String) — default: 'guest'
  • :pass (String) — default: 'guest'
  • :max_concurrency (Integer) — default: 4

Returns:



32
33
34
35
# File 'lib/freddy.rb', line 32

def self.build(logger = Logger.new($stdout), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config)
  connection = Adapters.determine.connect(config)
  new(connection, logger, max_concurrency)
end

.tracerObject



38
39
40
# File 'lib/freddy.rb', line 38

def self.tracer
  @tracer ||= OpenTelemetry.tracer_provider.tracer('freddy', Freddy::VERSION)
end

Instance Method Details

#closevoid

This method returns an undefined value.

Closes the connection with message queue

Examples:

freddy.close


229
230
231
# File 'lib/freddy.rb', line 229

def close
  @connection.close
end

#deliver(destination, payload, options = {}) ⇒ void

This method returns an undefined value.

Sends a message to given destination

This is *send and forget* type of delivery. It sends a message to given destination and does not wait for response. This is useful when there are multiple consumers that are using #tap_into or you just do not care about the response.

Examples:

freddy.deliver 'Metrics', user_id: 5, metric: 'signed_in'

Parameters:

  • destination (String)

    the queue name

  • payload (Hash)

    the payload that can be serialized to json

  • options (Hash) (defaults to: {})

    the options for delivery

Options Hash (options):

  • :timeout (Integer) — default: 0

    discards the message after given seconds if nobody consumes it. Message won’t be discarded if timeout it set to 0 (default).

  • :compress (String) — default: nil
    • ‘zlib’ - compresses the payload with zlib

  • :headers (Hash) — default: nil

    Arbitrary headers to add as message metadata



175
176
177
178
179
180
181
182
183
184
# File 'lib/freddy.rb', line 175

def deliver(destination, payload, options = {})
  timeout = options.fetch(:timeout, 0)
  compression_algorithm = options.fetch(:compress, nil)
  opts = {}
  opts[:expiration] = (timeout * 1000).to_i if timeout.positive?
  opts[:content_encoding] = compression_algorithm if compression_algorithm
  opts[:headers] = options[:headers] if options[:headers]

  @send_and_forget_producer.produce(destination, payload, opts)
end

#deliver_with_response(destination, payload, options = {}) ⇒ Hash

Sends a message and waits for the response

Examples:

begin
  response = freddy.deliver_with_response 'Users', type: 'fetch_all'
  puts "Got response #{response}"
rescue Freddy::TimeoutError
  puts "Service unavailable"
rescue Freddy::InvalidRequestError => e
  puts "Got error response: #{e.response}"
end

Parameters:

  • destination (String)

    the queue name

  • payload (Hash)

    the payload that can be serialized to json

  • options (Hash) (defaults to: {})

    the options for delivery

Options Hash (options):

  • :timeout (Integer) — default: 3

    throws a time out exception after given seconds when there is no response

  • :delete_on_timeout (Boolean) — default: true

    discards the message when timeout error is raised

Returns:

  • (Hash)

    the response

Raises:



215
216
217
218
219
220
221
# File 'lib/freddy.rb', line 215

def deliver_with_response(destination, payload, options = {})
  timeout = options.fetch(:timeout, 3)
  delete_on_timeout = options.fetch(:delete_on_timeout, true)

  @send_and_wait_response_producer.produce destination, payload,
                                           timeout_in_seconds: timeout, delete_on_timeout: delete_on_timeout
end

#respond_to(destination) {|message, handler| ... } ⇒ #shutdown

Listens and responds to messages

This consumes messages on a given destination. It is useful for messages that have to be processed once and then a result must be sent.

Examples:

freddy.respond_to 'RegistrationService' do |attributes, handler|
  if id = register(attributes)
    handler.success(id: id)
  else
    handler.error(message: 'Can not do')
  end
end

Parameters:

  • destination (String)

    the queue name

Yield Parameters:

  • message (Hash<Symbol => Object>)

    Received message as a ruby hash with symbolized keys

  • handler (#success, #error)

    Handler for responding to messages. Use handler#success for successful response and handler#error for error response.

Returns:

  • (#shutdown)


80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/freddy.rb', line 80

def respond_to(destination, &callback)
  @logger.info "Listening for requests on #{destination}"

  channel = @connection.create_channel(prefetch: @prefetch_buffer_size)
  producer = Producers::ReplyProducer.new(channel, @logger)
  handler_adapter_factory = MessageHandlerAdapters::Factory.new(producer)

  Consumers::RespondToConsumer.consume(
    **{
      thread_pool: Concurrent::FixedThreadPool.new(@prefetch_buffer_size),
      destination: destination,
      channel: channel,
      handler_adapter_factory: handler_adapter_factory
    },
    &callback
  )
end

#tap_into(pattern_or_patterns, options = {}) {|message| ... } ⇒ #shutdown

Listens for messages without consuming them

This listens for messages on a given destination or destinations without consuming them. It is useful for general messages that two or more clients are interested.

Examples:

freddy.tap_into 'notifications.*' do |message|
  puts "Notification showed #{message.inspect}"
end

Parameters:

  • pattern_or_patterns (String)

    the destination pattern. Use ‘#` wildcard for matching 0 or more words. Use `*` to match exactly one word.

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :group (String)

    only one of the listeners in given group will receive a message. All listeners will receive a message if the group is not specified.

  • :durable (Boolean)

    Should the consumer queue be durable? Default is ‘false`. This option can be used only in combination with option `:group`.

  • :on_exception (Boolean)

    Defines consumer’s behaviour when the callback fails to process a message and raises an exception. Can be one of ‘:ack`, `:reject` or `:requeue`. `:ack` simply acknowledges the message and re-raises the exception. `:reject` rejects the message without requeueing it. `:requeue` rejects the message with `requeue` flag.

  • :exchange_name (String)

    Exchange to bind to. Default is ‘freddy-topic`.

Yields:

  • (message)

    Yields received message to the block.

Yield Parameters:

  • payload (Object)

    Yields the received message’s payload.

  • routing_key (String)

    Yields the received message’s routing key.

  • timestamp (Time)

    Yields received message’s timestamp.

Returns:

  • (#shutdown)


137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/freddy.rb', line 137

def tap_into(pattern_or_patterns, options = {}, &callback)
  @logger.debug "Tapping into messages that match #{pattern_or_patterns}"

  Consumers::TapIntoConsumer.consume(
    **{
      thread_pool: Concurrent::FixedThreadPool.new(@prefetch_buffer_size),
      patterns: Array(pattern_or_patterns),
      channel: @connection.create_channel(prefetch: @prefetch_buffer_size),
      options: options
    },
    &callback
  )
end