Module: Karafka::Schemas
- Defined in:
- lib/karafka/schemas/config.rb,
lib/karafka/schemas/consumer_group.rb,
lib/karafka/schemas/responder_usage.rb,
lib/karafka/schemas/server_cli_options.rb,
lib/karafka/schemas/consumer_group_topic.rb
Overview
Namespace for all the validation schemas that we use to check input
Constant Summary collapse
- TOPIC_REGEXP =
Regexp for validating format of groups and topics
/\A(\w|\-|\.)+\z/
- Config =
Note:
There are many more configuration options inside of the Karafka::Setup::Config model, but we don’t validate them here as they are validated per each route (topic + consumer_group) because they can be overwritten, so we validate all of that once all the routes are defined and ready
Schema with validation rules for Karafka configuration details
Dry::Validation.Schema do required(:client_id).filled(:str?, format?: Karafka::Schemas::TOPIC_REGEXP) required(:shutdown_timeout) { none? | (int? & gteq?(0)) } required(:consumer_mapper) required(:topic_mapper) required(:params_base_class).filled optional(:backend).filled end
- ConsumerGroup =
Schema for single full route (consumer group + topics) validation.
Dry::Validation.Schema do # Valid uri schemas of Kafka broker url # The ||= is due to the behavior of require_all that resolves dependencies # but someetimes loads things twice URI_SCHEMES ||= %w[kafka kafka+ssl plaintext ssl].freeze # Available sasl scram mechanism of authentication (plus nil) SASL_SCRAM_MECHANISMS ||= %w[sha256 sha512].freeze configure do config. = File.join( Karafka.gem_root, 'config', 'errors.yml' ) # Uri validator to check if uri is in a Karafka acceptable format # @param uri [String] uri we want to validate # @return [Boolean] true if it is a valid uri, otherwise false def broker_schema?(uri) uri = URI.parse(uri) URI_SCHEMES.include?(uri.scheme) && uri.port rescue URI::InvalidURIError false end end required(:id).filled(:str?, format?: Karafka::Schemas::TOPIC_REGEXP) required(:seed_brokers).filled { each(:broker_schema?) } required(:session_timeout).filled { int? | float? } required(:pause_timeout) { none? | ((int? | float?) & gteq?(0)) } required(:offset_commit_interval) { int? | float? } required(:offset_commit_threshold).filled(:int?) required(:offset_retention_time) { none?.not > int? } required(:heartbeat_interval).filled { (int? | float?) & gteq?(0) } required(:fetcher_max_queue_size).filled(:int?, gt?: 0) required(:connect_timeout).filled { (int? | float?) & gt?(0) } required(:socket_timeout).filled { (int? | float?) & gt?(0) } required(:min_bytes).filled(:int?, gt?: 0) required(:max_bytes).filled(:int?, gt?: 0) required(:max_wait_time).filled { (int? | float?) & gteq?(0) } required(:batch_fetching).filled(:bool?) required(:topics).filled { each { schema(ConsumerGroupTopic) } } # Max wait time cannot exceed socket_timeout - wouldn't make sense rule( max_wait_time_limit: %i[max_wait_time socket_timeout] ) do |max_wait_time, socket_timeout| socket_timeout.int? > max_wait_time.lteq?(value(:socket_timeout)) end %i[ ssl_ca_cert ssl_ca_cert_file_path ssl_client_cert ssl_client_cert_key sasl_gssapi_principal sasl_gssapi_keytab sasl_plain_authzid sasl_plain_username sasl_plain_password sasl_scram_username sasl_scram_password ].each do |encryption_attribute| optional(encryption_attribute).maybe(:str?) end optional(:ssl_ca_certs_from_system).maybe(:bool?) # It's not with other encryptions as it has some more rules optional(:sasl_scram_mechanism) .maybe(:str?, included_in?: Karafka::Schemas::SASL_SCRAM_MECHANISMS) end
- ResponderUsageTopic =
Validator to check responder topic usage
Dry::Validation.Schema do required(:name).filled(:str?, format?: Karafka::Schemas::TOPIC_REGEXP) required(:required).filled(:bool?) required(:multiple_usage).filled(:bool?) required(:usage_count).filled(:int?, gteq?: 0) required(:registered).filled(eql?: true) required(:async).filled(:bool?) rule( required_usage: %i[required usage_count] ) do |required, usage_count| required.true? > usage_count.gteq?(1) end rule( multiple_usage_permission: %i[multiple_usage usage_count] ) do |multiple_usage, usage_count| usage_count.gt?(1) > multiple_usage.true? end rule( multiple_usage_block: %i[multiple_usage usage_count] ) do |multiple_usage, usage_count| multiple_usage.false? > usage_count.lteq?(1) end end
- ResponderUsage =
Validator to check that everything in a responder flow matches responder rules
Dry::Validation.Schema do required(:used_topics) { filled? > each { schema(ResponderUsageTopic) } } required(:registered_topics) { filled? > each { schema(ResponderUsageTopic) } } end
- ServerCliOptions =
Schema for validating correctness of the server cli command options We validate some basics + the list of consumer_groups on which we want to use, to make sure that all of them are defined, plus that a pidfile does not exist
Dry::Validation.Schema do configure do option :consumer_groups def self. super.merge( en: { errors: { consumer_groups_inclusion: 'Unknown consumer group.', pid_existence: 'Pidfile already exists.' } } ) end end optional(:pid).filled(:str?) optional(:daemon).filled(:bool?) optional(:consumer_groups).filled(:array?) validate(consumer_groups_inclusion: :consumer_groups) do |consumer_groups| # If there were no consumer_groups declared in the server cli, it means that we will # run all of them and no need to validate them here at all if consumer_groups.nil? true else (consumer_groups - Karafka::Routing::Builder.instance.map(&:name)).empty? end end validate(pid_existence: :pid) do |pid| pid ? !File.exist?(pid) : true end end
- ConsumerGroupTopic =
Consumer group topic validation rules
Dry::Validation.Schema do required(:id).filled(:str?, format?: Karafka::Schemas::TOPIC_REGEXP) required(:name).filled(:str?, format?: Karafka::Schemas::TOPIC_REGEXP) required(:backend).filled(included_in?: %i[inline sidekiq]) required(:consumer).filled required(:parser).filled required(:max_bytes_per_partition).filled(:int?, gteq?: 0) required(:start_from_beginning).filled(:bool?) required(:batch_consuming).filled(:bool?) required(:persistent).filled(:bool?) end