Class: CronoTrigger::PollingThread
- Inherits:
-
Object
- Object
- CronoTrigger::PollingThread
- Defined in:
- lib/crono_trigger/polling_thread.rb
Instance Method Summary collapse
- #alive? ⇒ Boolean
-
#initialize(model_queue, stop_flag, logger, executor, execution_counter) ⇒ PollingThread
constructor
A new instance of PollingThread.
- #join ⇒ Object
- #poll(model) ⇒ Object
- #quiet ⇒ Object
- #quiet? ⇒ Boolean
- #run ⇒ Object
Constructor Details
#initialize(model_queue, stop_flag, logger, executor, execution_counter) ⇒ PollingThread
Returns a new instance of PollingThread.
3 4 5 6 7 8 9 10 |
# File 'lib/crono_trigger/polling_thread.rb', line 3 def initialize(model_queue, stop_flag, logger, executor, execution_counter) @model_queue = model_queue @stop_flag = stop_flag @logger = logger @executor = executor @execution_counter = execution_counter @quiet = Concurrent::AtomicBoolean.new(false) end |
Instance Method Details
#alive? ⇒ Boolean
48 49 50 |
# File 'lib/crono_trigger/polling_thread.rb', line 48 def alive? @thread.alive? end |
#join ⇒ Object
36 37 38 |
# File 'lib/crono_trigger/polling_thread.rb', line 36 def join @thread.join end |
#poll(model) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/crono_trigger/polling_thread.rb', line 52 def poll(model) @logger.debug "(polling-thread-#{Thread.current.object_id}) Poll #{model}" records = [] overflowed_record_ids = [] begin model.connection_pool.with_connection do records = model.executables_with_lock end records.each do |record| begin @executor.post do @execution_counter.increment begin process_record(record) ensure @execution_counter.decrement end end rescue Concurrent::RejectedExecutionError overflowed_record_ids << record.id end end unlock_overflowed_records(model, overflowed_record_ids) end while overflowed_record_ids.empty? && records.any? end |
#quiet ⇒ Object
40 41 42 |
# File 'lib/crono_trigger/polling_thread.rb', line 40 def quiet @quiet.make_true end |
#quiet? ⇒ Boolean
44 45 46 |
# File 'lib/crono_trigger/polling_thread.rb', line 44 def quiet? @quiet.true? end |
#run ⇒ Object
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/crono_trigger/polling_thread.rb', line 12 def run @thread = Thread.start do @logger.info "(polling-thread-#{Thread.current.object_id}) Start polling thread" until @stop_flag.wait_for_set(CronoTrigger.config.polling_interval) next if quiet? CronoTrigger.reloader.call do begin model_name = @model_queue.pop(true) model = model_name.classify.constantize poll(model) rescue ThreadError => e @logger.error(e) unless e. == "queue empty" rescue => ex @logger.error(ex) CronoTrigger::GlobalExceptionHandler.handle_global_exception(ex) ensure @model_queue << model_name if model_name end end end end end |