Module: CronoTrigger::Worker

Defined in:
lib/crono_trigger/worker.rb

Constant Summary collapse

HEARTBEAT_INTERVAL =
60
SIGNAL_FETCH_INTERVAL =
10
EXECUTOR_SHUTDOWN_TIMELIMIT =
300
OTHER_THREAD_SHUTDOWN_TIMELIMIT =
120

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#polling_threadsObject (readonly)

Returns the value of attribute polling_threads.



9
10
11
# File 'lib/crono_trigger/worker.rb', line 9

def polling_threads
  @polling_threads
end

Instance Method Details

#initializeObject



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/crono_trigger/worker.rb', line 11

def initialize
  @crono_trigger_worker_id = CronoTrigger.config.worker_id
  @stop_flag = ServerEngine::BlockingFlag.new
  @heartbeat_stop_flag = ServerEngine::BlockingFlag.new
  @signal_fetch_stop_flag = ServerEngine::BlockingFlag.new
  @model_queue = Queue.new
  @model_names = CronoTrigger.config.model_names || CronoTrigger::Schedulable.included_by
  @model_names.each do |model_name|
    @model_queue << model_name
  end
  @executor = Concurrent::ThreadPoolExecutor.new(
    min_threads: 1,
    max_threads: CronoTrigger.config.executor_thread,
    max_queue: CronoTrigger.config.executor_thread * 2,
  )
  @execution_counter = Concurrent::AtomicFixnum.new
  @logger = Logger.new(STDOUT) unless @logger
  ActiveRecord::Base.logger = @logger
end

#quiet?Boolean

Returns:

  • (Boolean)


69
70
71
# File 'lib/crono_trigger/worker.rb', line 69

def quiet?
  @polling_threads&.all?(&:quiet?)
end

#runObject



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/crono_trigger/worker.rb', line 31

def run
  @heartbeat_thread = run_heartbeat_thread
  @signal_fetcn_thread = run_signal_fetch_thread

  polling_thread_count = CronoTrigger.config.polling_thread || [@model_names.size, Concurrent.processor_count].min
  # Assign local variable for Signal handling
  polling_threads = polling_thread_count.times.map { PollingThread.new(@model_queue, @stop_flag, @logger, @executor, @execution_counter) }
  @polling_threads = polling_threads
  @polling_threads.each(&:run)

  ServerEngine::SignalThread.new do |st|
    st.trap(:TSTP) do
      @logger.info("[worker_id:#{@crono_trigger_worker_id}] Transit to quiet mode")
      polling_threads.each(&:quiet)
      heartbeat
    end
  end

  @polling_threads.each(&:join)

  @executor.shutdown
  @executor.wait_for_termination(EXECUTOR_SHUTDOWN_TIMELIMIT)
  @heartbeat_thread.join(OTHER_THREAD_SHUTDOWN_TIMELIMIT)
  @signal_fetcn_thread.join(OTHER_THREAD_SHUTDOWN_TIMELIMIT)

  unregister
end

#stopObject



59
60
61
62
63
# File 'lib/crono_trigger/worker.rb', line 59

def stop
  @stop_flag.set!
  @heartbeat_stop_flag.set!
  @signal_fetch_stop_flag.set!
end

#stopped?Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/crono_trigger/worker.rb', line 65

def stopped?
  @stop_flag.set?
end