Class: Kafka::Consumer
- Inherits:
-
Object
- Object
- Kafka::Consumer
- Includes:
- Java::JavaLang::Runnable
- Defined in:
- lib/jruby-kafka/consumer.rb
Overview
noinspection JRubyStringImportInspection
Instance Method Summary collapse
-
#initialize(a_stream, a_thread_number, a_queue, restart_on_exception, a_sleep_ms) ⇒ Consumer
constructor
A new instance of Consumer.
- #run ⇒ Object
Constructor Details
#initialize(a_stream, a_thread_number, a_queue, restart_on_exception, a_sleep_ms) ⇒ Consumer
Returns a new instance of Consumer.
18 19 20 21 22 23 24 |
# File 'lib/jruby-kafka/consumer.rb', line 18 def initialize(a_stream, a_thread_number, a_queue, restart_on_exception, a_sleep_ms) @m_thread_number = a_thread_number @m_stream = a_stream @m_queue = a_queue @m_restart_on_exception = restart_on_exception @m_sleep_ms = 1.0 / 1000.0 * Float(a_sleep_ms) end |
Instance Method Details
#run ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/jruby-kafka/consumer.rb', line 26 def run it = @m_stream.iterator begin while it.hasNext begin @m_queue << it.next. end end rescue Exception => e puts("#{self.class.name} caught exception: #{e.class.name}") puts(e.) if e. != '' if @m_restart_on_exception sleep(@m_sleep_ms) retry else raise e end end end |