Class: CronoTrigger::PollingThread
- Inherits:
-
Object
- Object
- CronoTrigger::PollingThread
- Defined in:
- lib/crono_trigger/polling_thread.rb
Instance Method Summary collapse
-
#initialize(model_queue, stop_flag, logger, executor) ⇒ PollingThread
constructor
A new instance of PollingThread.
- #join ⇒ Object
- #poll(model) ⇒ Object
- #run ⇒ Object
Constructor Details
#initialize(model_queue, stop_flag, logger, executor) ⇒ PollingThread
Returns a new instance of PollingThread.
3 4 5 6 7 8 |
# File 'lib/crono_trigger/polling_thread.rb', line 3 def initialize(model_queue, stop_flag, logger, executor) @model_queue = model_queue @stop_flag = stop_flag @logger = logger @executor = executor end |
Instance Method Details
#join ⇒ Object
28 29 30 |
# File 'lib/crono_trigger/polling_thread.rb', line 28 def join @thread.join end |
#poll(model) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/crono_trigger/polling_thread.rb', line 32 def poll(model) @logger.debug "(polling-thread-#{Thread.current.object_id}) Poll #{model}" records = [] primary_key_offset = nil begin model.connection_pool.with_connection do records = model.executables_with_lock(primary_key_offset: primary_key_offset) primary_key_offset = records.last && records.last.id end records.each do |record| @executor.post do model.connection_pool.with_connection do @logger.info "(executor-thread-#{Thread.current.object_id}) Execute #{record.class}-#{record.id}" begin record.do_execute rescue Exception => e @logger.error(e) end end end end end while records.any? end |
#run ⇒ Object
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/crono_trigger/polling_thread.rb', line 10 def run @thread = Thread.start do @logger.info "(polling-thread-#{Thread.current.object_id}) Start polling thread" until @stop_flag.wait_for_set(CronoTrigger.config.polling_interval) begin model = @model_queue.pop(true) poll(model) rescue ThreadError => e @logger.error(e) unless e. == "queue empty" rescue => e @logger.error(e) ensure @model_queue << model if model end end end end |