Module: Karafka::Schemas

Defined in:
lib/karafka/schemas/config.rb,
lib/karafka/schemas/consumer_group.rb,
lib/karafka/schemas/responder_usage.rb,
lib/karafka/schemas/server_cli_options.rb,
lib/karafka/schemas/consumer_group_topic.rb

Overview

Namespace for all the validation schemas that we use to check input

Constant Summary collapse

TOPIC_REGEXP =

Regexp for validating format of groups and topics

/\A(\w|\-|\.)+\z/
Config =
Note:

There are many more configuration options inside of the Karafka::Setup::Config model, but we don’t validate them here as they are validated per each route (topic + consumer_group) because they can be overwritten, so we validate all of that once all the routes are defined and ready

Schema with validation rules for Karafka configuration details

Dry::Validation.Schema do
  required(:client_id).filled(:str?, format?: Karafka::Schemas::TOPIC_REGEXP)
  required(:shutdown_timeout) { none? | (int? & gteq?(0)) }
  required(:consumer_mapper)
  required(:topic_mapper)
  required(:params_base_class).filled

  optional(:backend).filled
end
ConsumerGroup =

Schema for single full route (consumer group + topics) validation.

Dry::Validation.Schema do
  # Valid uri schemas of Kafka broker url
  # The ||= is due to the behavior of require_all that resolves dependencies
  # but someetimes loads things twice
  URI_SCHEMES ||= %w[kafka kafka+ssl plaintext ssl].freeze

  # Available sasl scram mechanism of authentication (plus nil)
  SASL_SCRAM_MECHANISMS ||= %w[sha256 sha512].freeze

  configure do
    config.messages_file = File.join(
      Karafka.gem_root, 'config', 'errors.yml'
    )

    # Uri validator to check if uri is in a Karafka acceptable format
    # @param uri [String] uri we want to validate
    # @return [Boolean] true if it is a valid uri, otherwise false
    def broker_schema?(uri)
      uri = URI.parse(uri)
      URI_SCHEMES.include?(uri.scheme) && uri.port
    rescue URI::InvalidURIError
      false
    end
  end

  required(:id).filled(:str?, format?: Karafka::Schemas::TOPIC_REGEXP)
  required(:seed_brokers).filled { each(:broker_schema?) }
  required(:session_timeout).filled { int? | float? }
  required(:pause_timeout) { none? | ((int? | float?) & gteq?(0)) }
  required(:offset_commit_interval) { int? | float? }
  required(:offset_commit_threshold).filled(:int?)
  required(:offset_retention_time) { none?.not > int? }
  required(:heartbeat_interval).filled { (int? | float?) & gteq?(0) }
  required(:fetcher_max_queue_size).filled(:int?, gt?: 0)
  required(:connect_timeout).filled { (int? | float?) & gt?(0) }
  required(:socket_timeout).filled { (int? | float?) & gt?(0) }
  required(:min_bytes).filled(:int?, gt?: 0)
  required(:max_bytes).filled(:int?, gt?: 0)
  required(:max_wait_time).filled { (int? | float?) & gteq?(0) }
  required(:batch_fetching).filled(:bool?)
  required(:topics).filled { each { schema(ConsumerGroupTopic) } }

  # Max wait time cannot exceed socket_timeout - wouldn't make sense
  rule(
    max_wait_time_limit: %i[max_wait_time socket_timeout]
  ) do |max_wait_time, socket_timeout|
    socket_timeout.int? > max_wait_time.lteq?(value(:socket_timeout))
  end

  %i[
    ssl_ca_cert
    ssl_ca_cert_file_path
    ssl_client_cert
    ssl_client_cert_key
    sasl_gssapi_principal
    sasl_gssapi_keytab
    sasl_plain_authzid
    sasl_plain_username
    sasl_plain_password
    sasl_scram_username
    sasl_scram_password
  ].each do |encryption_attribute|
    optional(encryption_attribute).maybe(:str?)
  end

  optional(:ssl_ca_certs_from_system).maybe(:bool?)

  # It's not with other encryptions as it has some more rules
  optional(:sasl_scram_mechanism)
    .maybe(:str?, included_in?: Karafka::Schemas::SASL_SCRAM_MECHANISMS)
end
ResponderUsageTopic =

Validator to check responder topic usage

Dry::Validation.Schema do
  required(:name).filled(:str?, format?: Karafka::Schemas::TOPIC_REGEXP)
  required(:required).filled(:bool?)
  required(:multiple_usage).filled(:bool?)
  required(:usage_count).filled(:int?, gteq?: 0)
  required(:registered).filled(eql?: true)
  required(:async).filled(:bool?)

  rule(
    required_usage: %i[required usage_count]
  ) do |required, usage_count|
    required.true? > usage_count.gteq?(1)
  end

  rule(
    multiple_usage_permission: %i[multiple_usage usage_count]
  ) do |multiple_usage, usage_count|
    usage_count.gt?(1) > multiple_usage.true?
  end

  rule(
    multiple_usage_block: %i[multiple_usage usage_count]
  ) do |multiple_usage, usage_count|
    multiple_usage.false? > usage_count.lteq?(1)
  end
end
ResponderUsage =

Validator to check that everything in a responder flow matches responder rules

Dry::Validation.Schema do
  required(:used_topics) { filled? > each { schema(ResponderUsageTopic) } }
  required(:registered_topics) { filled? > each { schema(ResponderUsageTopic) } }
end
ServerCliOptions =

Schema for validating correctness of the server cli command options We validate some basics + the list of consumer_groups on which we want to use, to make sure that all of them are defined, plus that a pidfile does not exist

Dry::Validation.Schema do
  configure do
    option :consumer_groups

    def self.messages
      super.merge(
        en: {
          errors: {
            consumer_groups_inclusion: 'Unknown consumer group.',
            pid_existence: 'Pidfile already exists.'
          }
        }
      )
    end
  end

  optional(:pid).filled(:str?)
  optional(:daemon).filled(:bool?)
  optional(:consumer_groups).filled(:array?)

  validate(consumer_groups_inclusion: :consumer_groups) do |consumer_groups|
    # If there were no consumer_groups declared in the server cli, it means that we will
    # run all of them and no need to validate them here at all
    if consumer_groups.nil?
      true
    else
      (consumer_groups - Karafka::Routing::Builder.instance.map(&:name)).empty?
    end
  end

  validate(pid_existence: :pid) do |pid|
    pid ? !File.exist?(pid) : true
  end
end
ConsumerGroupTopic =

Consumer group topic validation rules

Dry::Validation.Schema do
  required(:id).filled(:str?, format?: Karafka::Schemas::TOPIC_REGEXP)
  required(:name).filled(:str?, format?: Karafka::Schemas::TOPIC_REGEXP)
  required(:backend).filled(included_in?: %i[inline sidekiq])
  required(:consumer).filled
  required(:parser).filled
  required(:max_bytes_per_partition).filled(:int?, gteq?: 0)
  required(:start_from_beginning).filled(:bool?)
  required(:batch_consuming).filled(:bool?)
  required(:persistent).filled(:bool?)
end