Module: DeliveryBoy

Defined in:
lib/delivery_boy.rb,
lib/delivery_boy/fake.rb,
lib/delivery_boy/config.rb,
lib/delivery_boy/railtie.rb,
lib/delivery_boy/version.rb,
lib/delivery_boy/instance.rb,
lib/delivery_boy/config_error.rb,
lib/generators/delivery_boy/install_generator.rb

Defined Under Namespace

Modules: Generators Classes: Config, Fake, Instance, Railtie

Constant Summary collapse

VERSION =
"1.2.0"
ConfigError =
Class.new(StandardError)

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.loggerLogger

The logger used by DeliveryBoy.

Returns:

  • (Logger)


106
107
108
109
110
111
112
# File 'lib/delivery_boy.rb', line 106

def logger
  @logger ||= Logger.new($stdout).tap do |logger|
    if config.log_level
      logger.level = Object.const_get("Logger::#{config.log_level.upcase}")
    end
  end
end

Class Method Details

.buffer_sizeObject

Return the number of messages in the buffer



90
91
92
# File 'lib/delivery_boy.rb', line 90

def buffer_size
  instance.buffer_size
end

.clear_bufferObject

Clear any buffered messages generated by produce or produce! methods.



85
86
87
# File 'lib/delivery_boy.rb', line 85

def clear_buffer
  instance.clear_buffer
end

.clear_config!Object



125
126
127
# File 'lib/delivery_boy.rb', line 125

def clear_config!
  @config = nil
end

.configDeliveryBoy::Config

The configuration used by DeliveryBoy.

Returns:



119
120
121
122
123
# File 'lib/delivery_boy.rb', line 119

def config
  @config ||= DeliveryBoy::Config.new(env: ENV)
rescue KingKonf::ConfigError => e
  raise ConfigError, e.message
end

.configure {|DeliveryBoy::Config| ... } ⇒ nil

Configure DeliveryBoy in a block.

DeliveryBoy.configure do |config|
  config.client_id = "yolo"
end

Yields:

Returns:

  • (nil)


137
138
139
# File 'lib/delivery_boy.rb', line 137

def configure
  yield config
end

.deliver(value, topic:, **options) ⇒ nil

Write a message to a specified Kafka topic synchronously.

Keep in mind that the client will block until the message has been delivered.

Parameters:

  • value (String)

    the message value.

  • topic (String)

    the topic that the message should be written to.

  • key (String, nil)

    the message key.

  • partition (Integer, nil)

    the topic partition that the message should be written to.

  • partition_key (String, nil)

    a key used to deterministically assign a partition to the message.

Returns:

  • (nil)

Raises:

  • (Kafka::BufferOverflow)

    if the producer’s buffer is full.

  • (Kafka::DeliveryFailed)

    if delivery failed for some reason.



28
29
30
# File 'lib/delivery_boy.rb', line 28

def deliver(value, topic:, **options)
  instance.deliver(value, topic: topic, **options)
end

.deliver_async(value, topic:, **options) ⇒ nil

Like deliver_async!, but handles Kafka::BufferOverflow errors by logging them and just going on with normal business.

Returns:

  • (nil)


36
37
38
39
40
# File 'lib/delivery_boy.rb', line 36

def deliver_async(value, topic:, **options)
  deliver_async!(value, topic: topic, **options)
rescue Kafka::BufferOverflow
  logger.error "Message for `#{topic}` dropped due to buffer overflow"
end

.deliver_async!(value, topic:, **options) ⇒ nil

Like deliver, but returns immediately.

The actual delivery takes place in a background thread.

Returns:

  • (nil)


47
48
49
# File 'lib/delivery_boy.rb', line 47

def deliver_async!(value, topic:, **options)
  instance.deliver_async!(value, topic: topic, **options)
end

.deliver_messagesnil

Delivers the items currently in the producer buffer.

Returns:

  • (nil)

Raises:

  • (Kafka::DeliveryFailed)

    if delivery failed for some reason.



80
81
82
# File 'lib/delivery_boy.rb', line 80

def deliver_messages
  instance.deliver_messages
end

.produce(value, topic:, **options) ⇒ nil

Like produce!, but handles Kafka::BufferOverflow errors by logging them and just going on with normal business.

Returns:

  • (nil)


55
56
57
58
59
# File 'lib/delivery_boy.rb', line 55

def produce(value, topic:, **options)
  produce!(value, topic: topic, **options)
rescue Kafka::BufferOverflow
  logger.error "Message for `#{topic}` dropped due to buffer overflow"
end

.produce!(value, topic:, **options) ⇒ nil

Appends the given message to the producer buffer but does not send it until deliver_messages is called.

Parameters:

  • value (String)

    the message value.

  • topic (String)

    the topic that the message should be written to.

  • key (String, nil)

    the message key.

  • partition (Integer, nil)

    the topic partition that the message should be written to.

  • partition_key (String, nil)

    a key used to deterministically assign a partition to the message.

Returns:

  • (nil)

Raises:

  • (Kafka::BufferOverflow)

    if the producer’s buffer is full.



72
73
74
# File 'lib/delivery_boy.rb', line 72

def produce!(value, topic:, **options)
  instance.produce(value, topic: topic, **options)
end

.shutdownnil

Shut down DeliveryBoy.

Automatically called when the process exits.

Returns:

  • (nil)


99
100
101
# File 'lib/delivery_boy.rb', line 99

def shutdown
  instance.shutdown
end

.test_mode!Object



141
142
143
# File 'lib/delivery_boy.rb', line 141

def test_mode!
  @instance = testing
end

.testingObject



145
146
147
# File 'lib/delivery_boy.rb', line 145

def testing
  @testing ||= Fake.new
end