Class: Freddy

Inherits:
Object
  • Object
show all
Defined in:
lib/freddy.rb,
lib/freddy/payload.rb,
lib/freddy/adapters.rb,
lib/freddy/delivery.rb,
lib/freddy/timeout_error.rb,
lib/freddy/trace_carrier.rb,
lib/freddy/error_response.rb,
lib/freddy/message_handler.rb,
lib/freddy/request_manager.rb,
lib/freddy/responder_handler.rb,
lib/freddy/invalid_request_error.rb,
lib/freddy/adapters/bunny_adapter.rb,
lib/freddy/sync_response_container.rb,
lib/freddy/producers/reply_producer.rb,
lib/freddy/message_handler_adapaters.rb,
lib/freddy/adapters/march_hare_adapter.rb,
lib/freddy/consumers/response_consumer.rb,
lib/freddy/consumers/tap_into_consumer.rb,
lib/freddy/consumers/respond_to_consumer.rb,
lib/freddy/producers/send_and_forget_producer.rb,
lib/freddy/producers/send_and_wait_response_producer.rb

Defined Under Namespace

Modules: Adapters, Consumers, MessageHandlerAdapters, Producers Classes: Delivery, ErrorResponse, InvalidRequestError, MessageHandler, Payload, RequestManager, ResponderHandler, SyncResponseContainer, TimeoutError, TraceCarrier

Constant Summary collapse

FREDDY_TOPIC_EXCHANGE_NAME =
'freddy-topic'.freeze
DEFAULT_MAX_CONCURRENCY =
4

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.build(logger = Logger.new(STDOUT), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config) ⇒ Freddy

Creates a new freddy instance

Examples:

Freddy.build(Logger.new(STDOUT), user: 'thumper', pass: 'howdy')

Parameters:

  • logger (Logger) (defaults to: Logger.new(STDOUT))

    instance of a logger, defaults to the STDOUT logger

  • config (Hash)

    rabbitmq connection information

Options Hash (**config):

  • :host (String) — default: 'localhost'
  • :port (Integer) — default: 5672
  • :user (String) — default: 'guest'
  • :pass (String) — default: 'guest'
  • :max_concurrency (Integer) — default: 4

Returns:



28
29
30
31
32
33
# File 'lib/freddy.rb', line 28

def self.build(logger = Logger.new(STDOUT), max_concurrency: DEFAULT_MAX_CONCURRENCY, **config)
  OpenTracing.global_tracer ||= OpenTracing::Tracer.new

  connection = Adapters.determine.connect(config)
  new(connection, logger, max_concurrency)
end

.traceObject



35
36
37
# File 'lib/freddy.rb', line 35

def self.trace
  Thread.current[:freddy_trace]
end

.trace=(trace) ⇒ Object



39
40
41
# File 'lib/freddy.rb', line 39

def self.trace=(trace)
  Thread.current[:freddy_trace] = trace
end

Instance Method Details

#closevoid

This method returns an undefined value.

Closes the connection with message queue

Examples:

freddy.close


204
205
206
# File 'lib/freddy.rb', line 204

def close
  @connection.close
end

#deliver(destination, payload, options = {}) ⇒ void

This method returns an undefined value.

Sends a message to given destination

This is *send and forget* type of delivery. It sends a message to given destination and does not wait for response. This is useful when there are multiple consumers that are using #tap_into or you just do not care about the response.

Examples:

freddy.deliver 'Metrics', user_id: 5, metric: 'signed_in'

Parameters:

  • destination (String)

    the queue name

  • payload (Hash)

    the payload that can be serialized to json

  • options (Hash) (defaults to: {})

    the options for delivery

Options Hash (options):

  • :timeout (Integer) — default: 0

    discards the message after given seconds if nobody consumes it. Message won’t be discarded if timeout it set to 0 (default).



152
153
154
155
156
157
158
# File 'lib/freddy.rb', line 152

def deliver(destination, payload, options = {})
  timeout = options.fetch(:timeout, 0)
  opts = {}
  opts[:expiration] = (timeout * 1000).to_i if timeout > 0

  @send_and_forget_producer.produce(destination, payload, opts)
end

#deliver_with_response(destination, payload, options = {}) ⇒ Hash

Sends a message and waits for the response

Examples:

begin
  response = freddy.deliver_with_response 'Users', type: 'fetch_all'
  puts "Got response #{response}"
rescue Freddy::TimeoutError
  puts "Service unavailable"
rescue Freddy::InvalidRequestError => e
  puts "Got error response: #{e.response}"
end

Parameters:

  • destination (String)

    the queue name

  • payload (Hash)

    the payload that can be serialized to json

  • options (Hash) (defaults to: {})

    the options for delivery

Options Hash (options):

  • :timeout (Integer) — default: 3

    throws a time out exception after given seconds when there is no response

  • :delete_on_timeout (Boolean) — default: true

    discards the message when timeout error is raised

Returns:

  • (Hash)

    the response

Raises:



189
190
191
192
193
194
195
196
# File 'lib/freddy.rb', line 189

def deliver_with_response(destination, payload, options = {})
  timeout = options.fetch(:timeout, 3)
  delete_on_timeout = options.fetch(:delete_on_timeout, true)

  @send_and_wait_response_producer.produce destination, payload, {
    timeout_in_seconds: timeout, delete_on_timeout: delete_on_timeout
  }
end

#respond_to(destination) {|message, handler| ... } ⇒ #shutdown

Listens and responds to messages

This consumes messages on a given destination. It is useful for messages that have to be processed once and then a result must be sent.

Examples:

freddy.respond_to 'RegistrationService' do |attributes, handler|
  if id = register(attributes)
    handler.success(id: id)
  else
    handler.error(message: 'Can not do')
  end
end

Parameters:

  • destination (String)

    the queue name

Yield Parameters:

  • message (Hash<Symbol => Object>)

    Received message as a ruby hash with symbolized keys

  • handler (#success, #error)

    Handler for responding to messages. Use handler#success for successful respone and handler#error for error response.

Returns:

  • (#shutdown)


81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/freddy.rb', line 81

def respond_to(destination, &callback)
  @logger.info "Listening for requests on #{destination}"

  channel = @connection.create_channel(prefetch: @prefetch_buffer_size)
  producer = Producers::ReplyProducer.new(channel, @logger)
  handler_adapter_factory = MessageHandlerAdapters::Factory.new(producer)

  Consumers::RespondToConsumer.consume(
    thread_pool: Thread.pool(@prefetch_buffer_size),
    destination: destination,
    channel: channel,
    handler_adapter_factory: handler_adapter_factory,
    &callback
  )
end

#tap_into(pattern, options = {}) {|message| ... } ⇒ #shutdown

Listens for messages without consuming them

This listens for messages on a given destination or destinations without consuming them. It is useful for general messages that two or more clients are interested.

Examples:

freddy.tap_into 'notifications.*' do |message|
  puts "Notification showed #{message.inspect}"
end

Parameters:

  • pattern (String)

    the destination pattern. Use ‘#` wildcard for matching 0 or more words. Use `*` to match exactly one word.

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :group (String)

    only one of the listeners in given group will receive a message. All listeners will receive a message if the group is not specified.

Yields:

  • (message)

    Yields received message to the block

Returns:

  • (#shutdown)


119
120
121
122
123
124
125
126
127
128
129
# File 'lib/freddy.rb', line 119

def tap_into(pattern, options = {}, &callback)
  @logger.debug "Tapping into messages that match #{pattern}"

  Consumers::TapIntoConsumer.consume(
    thread_pool: Thread.pool(@prefetch_buffer_size),
    pattern: pattern,
    channel: @connection.create_channel(prefetch: @prefetch_buffer_size),
    options: options,
    &callback
  )
end