Class: Racecar::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/racecar/runner.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(processor, config:, logger:, instrumenter: NullInstrumenter) ⇒ Runner

Returns a new instance of Runner.



13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/racecar/runner.rb', line 13

def initialize(processor, config:, logger:, instrumenter: NullInstrumenter)
  @processor, @config, @logger = processor, config, logger
  @instrumenter = instrumenter
  @stop_requested = false
  Rdkafka::Config.logger = logger

  if processor.respond_to?(:statistics_callback)
    Rdkafka::Config.statistics_callback = processor.method(:statistics_callback).to_proc
  end

  setup_pauses
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



11
12
13
# File 'lib/racecar/runner.rb', line 11

def config
  @config
end

#loggerObject (readonly)

Returns the value of attribute logger.



11
12
13
# File 'lib/racecar/runner.rb', line 11

def logger
  @logger
end

#processorObject (readonly)

Returns the value of attribute processor.



11
12
13
# File 'lib/racecar/runner.rb', line 11

def processor
  @processor
end

Instance Method Details

#runObject



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/racecar/runner.rb', line 48

def run
  install_signal_handlers
  @stop_requested = false

  # Configure the consumer with a producer so it can produce messages and
  # with a consumer so that it can support advanced use-cases.
  processor.configure(
    producer:     producer,
    consumer:     consumer,
    instrumenter: @instrumenter,
    config:       @config,
  )

  instrumentation_payload = {
    consumer_class: processor.class.to_s,
    consumer_set: consumer
  }

  # Main loop
  loop do
    break if @stop_requested
    resume_paused_partitions
    @instrumenter.instrument("main_loop", instrumentation_payload) do
      case process_method
      when :batch then
        msg_per_part = consumer.batch_poll(config.max_wait_time_ms).group_by(&:partition)
        msg_per_part.each_value do |messages|
          process_batch(messages)
        end
      when :single then
        message = consumer.poll(config.max_wait_time_ms)
        process(message) if message
      end
    end
  end

  logger.info "Gracefully shutting down"
  begin
    processor.deliver!
    processor.teardown
    consumer.commit
  ensure
    @instrumenter.instrument('leave_group') do
      consumer.close
    end
  end
ensure
  producer.close
  Racecar::Datadog.close if Object.const_defined?("Racecar::Datadog")
end

#setup_pausesObject



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/racecar/runner.rb', line 26

def setup_pauses
  timeout = if config.pause_timeout == -1
    nil
  elsif config.pause_timeout == 0
    # no op, handled elsewhere
  elsif config.pause_timeout > 0
    config.pause_timeout
  else
    raise ArgumentError, "Invalid value for pause_timeout: must be integer greater or equal -1"
  end

  @pauses = Hash.new {|h, k|
    h[k] = Hash.new {|h2, k2|
      h2[k2] = ::Racecar::Pause.new(
        timeout:             timeout,
        max_timeout:         config.max_pause_timeout,
        exponential_backoff: config.pause_with_exponential_backoff
      )
    }
  }
end

#stopObject



99
100
101
# File 'lib/racecar/runner.rb', line 99

def stop
  @stop_requested = true
end