Module: Karafka::Pro::ScheduledMessages

Defined in:
lib/karafka/pro/scheduled_messages.rb,
lib/karafka/pro/scheduled_messages/day.rb,
lib/karafka/pro/scheduled_messages/proxy.rb,
lib/karafka/pro/scheduled_messages/state.rb,
lib/karafka/pro/scheduled_messages/errors.rb,
lib/karafka/pro/scheduled_messages/tracker.rb,
lib/karafka/pro/scheduled_messages/consumer.rb,
lib/karafka/pro/scheduled_messages/max_epoch.rb,
lib/karafka/pro/scheduled_messages/dispatcher.rb,
lib/karafka/pro/scheduled_messages/serializer.rb,
lib/karafka/pro/scheduled_messages/daily_buffer.rb,
lib/karafka/pro/scheduled_messages/setup/config.rb,
lib/karafka/pro/scheduled_messages/contracts/config.rb,
lib/karafka/pro/scheduled_messages/schema_validator.rb,
lib/karafka/pro/scheduled_messages/contracts/message.rb,
lib/karafka/pro/scheduled_messages/deserializers/headers.rb,
lib/karafka/pro/scheduled_messages/deserializers/payload.rb

Overview

This feature allows for proxying messages via a special topic that can dispatch them at a later time, hence scheduled messages. Such messages need to have a special format but aside from that they are regular Kafka messages.

This work was conceptually inspired by the Go scheduler: github.com/etf1/kafka-message-scheduler though I did not look at the implementation itself. Just the concept of daily in-memory scheduling.

Defined Under Namespace

Modules: Contracts, Deserializers, Errors, Proxy, SchemaValidator, Setup Classes: Consumer, DailyBuffer, Day, Dispatcher, MaxEpoch, Serializer, State, Tracker

Constant Summary collapse

SCHEMA_VERSION =

Version of the schema we use for envelops in scheduled messages. We use it to detect any potential upgrades similar to other components of Karafka and to stop processing of incompatible versions

'1.0.0'
STATES_SCHEMA_VERSION =

Version of the states schema. Used to publish per partition simple aggregated metrics that can be used for schedules reporting

'1.0.0'

Class Method Summary collapse

Class Method Details

.cancelHash

Generates a tombstone message to cancel given dispatch (if not yet happened)

Returns:

  • (Hash)

    tombstone cancelling message



34
35
36
# File 'lib/karafka/pro/scheduled_messages.rb', line 34

def cancel(**)
  Proxy.cancel(**)
end

.post_fork(config, pre_fork_producer) ⇒ Object

Basically since we may have custom producers configured that are not the same as the default one, we hold a reference to old pre-fork producer. This means, that when we initialize it again in post-fork, as long as user uses defaults we should re-inherit it from the default config.

Parameters:

  • config (Karafka::Core::Configurable::Node)
  • pre_fork_producer (WaterDrop::Producer)


65
66
67
68
69
# File 'lib/karafka/pro/scheduled_messages.rb', line 65

def post_fork(config, pre_fork_producer)
  return unless config.scheduled_messages.producer == pre_fork_producer

  config.scheduled_messages.producer = config.producer
end

.post_setup(config) ⇒ Object

Parameters:

  • config (Karafka::Core::Configurable::Node)

    root node config



51
52
53
54
55
56
# File 'lib/karafka/pro/scheduled_messages.rb', line 51

def post_setup(config)
  ScheduledMessages::Contracts::Config.new.validate!(
    config.to_h,
    scope: %w[config]
  )
end

.pre_setup(config) ⇒ Object

Sets up additional config scope, validations and other things

Parameters:

  • config (Karafka::Core::Configurable::Node)

    root node config



43
44
45
46
47
48
# File 'lib/karafka/pro/scheduled_messages.rb', line 43

def pre_setup(config)
  # Expand the config with this feature specific stuff
  config.instance_eval do
    setting(:scheduled_messages, default: Setup::Config.config)
  end
end

.scheduleHash

Runs the ‘Proxy.call`

Returns:

  • (Hash)

    message wrapped with the scheduled message envelope



28
29
30
# File 'lib/karafka/pro/scheduled_messages.rb', line 28

def schedule(**)
  Proxy.schedule(**)
end