Class: CronoTrigger::PollingThread

Inherits:
Object
  • Object
show all
Defined in:
lib/crono_trigger/polling_thread.rb

Instance Method Summary collapse

Constructor Details

#initialize(model_queue, stop_flag, logger, executor) ⇒ PollingThread

Returns a new instance of PollingThread.



3
4
5
6
7
8
# File 'lib/crono_trigger/polling_thread.rb', line 3

def initialize(model_queue, stop_flag, logger, executor)
  @model_queue = model_queue
  @stop_flag = stop_flag
  @logger = logger
  @executor = executor
end

Instance Method Details

#joinObject



28
29
30
# File 'lib/crono_trigger/polling_thread.rb', line 28

def join
  @thread.join
end

#poll(model) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/crono_trigger/polling_thread.rb', line 32

def poll(model)
  @logger.debug "(polling-thread-#{Thread.current.object_id}) Poll #{model}"
  records = []
  primary_key_offset = nil
  begin
    model.connection_pool.with_connection do
      records = model.executables_with_lock(primary_key_offset: primary_key_offset)
      primary_key_offset = records.last && records.last.id
    end

    records.each do |record|
      @executor.post do
        model.connection_pool.with_connection do
          @logger.info "(executor-thread-#{Thread.current.object_id}) Execute #{record.class}-#{record.id}"
          begin
            record.do_execute
          rescue Exception => e
            @logger.error(e)
          end
        end
      end
    end
  end while records.any?
end

#runObject



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/crono_trigger/polling_thread.rb', line 10

def run
  @thread = Thread.start do
    @logger.info "(polling-thread-#{Thread.current.object_id}) Start polling thread"
    until @stop_flag.wait_for_set(CronoTrigger.config.polling_interval)
      begin
        model = @model_queue.pop(true)
        poll(model)
      rescue ThreadError => e
        @logger.error(e) unless e.message == "queue empty"
      rescue => e
        @logger.error(e)
      ensure
        @model_queue << model if model
      end
    end
  end
end