Class: Recurrent::Worker
- Inherits:
-
Object
- Object
- Recurrent::Worker
- Defined in:
- lib/recurrent/worker.rb
Instance Attribute Summary collapse
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#scheduler ⇒ Object
Returns the value of attribute scheduler.
Instance Method Summary collapse
- #execute ⇒ Object
- #execute_with_locking ⇒ Object
-
#initialize(options = {}) ⇒ Worker
constructor
A new instance of Worker.
- #start ⇒ Object
- #wait_for_running_tasks ⇒ Object
- #wait_for_running_tasks_for(seconds) ⇒ Object
- #wait_for_running_tasks_indefinitely ⇒ Object
- #wait_until(time) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Worker
Returns a new instance of Worker.
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/recurrent/worker.rb', line 6 def initialize(={}) Configuration.maximum_concurrent_tasks = [:maximum_concurrent_tasks] Configuration.setup.call if Configuration.setup file = [:file] @scheduler = Scheduler.new(file) if [:every] every = eval([:every]).to_i if [:ruby] @scheduler.every(every, [:name]) do eval([:ruby]) end elsif [:system] @scheduler.every(every, [:name]) do system([:system]) end end end @logger = scheduler.logger end |
Instance Attribute Details
#logger ⇒ Object
Returns the value of attribute logger.
4 5 6 |
# File 'lib/recurrent/worker.rb', line 4 def logger @logger end |
#scheduler ⇒ Object
Returns the value of attribute scheduler.
4 5 6 |
# File 'lib/recurrent/worker.rb', line 4 def scheduler @scheduler end |
Instance Method Details
#execute ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/recurrent/worker.rb', line 42 def execute loop do execution_time = scheduler.tasks.next_execution_time tasks_to_execute = scheduler.tasks.scheduled_to_execute_at(execution_time, :sort_by_frequency => !!Configuration.maximum_concurrent_tasks).reverse wait_for_running_tasks && break if $exit wait_until(execution_time) wait_for_running_tasks && break if $exit tasks_to_execute.each do |task| logger.info "#{task.name}: Executing at #{execution_time.to_s(:seconds)}" task.execute(execution_time) end wait_for_running_tasks && break if $exit end end |
#execute_with_locking ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/recurrent/worker.rb', line 62 def execute_with_locking lock_established = nil until lock_established break if $exit lock_established = Configuration.process_locking.call(*scheduler.tasks.map(&:name)) do execute end break if $exit logger.info 'Tasks are being monitored by another process. Standing by.' sleep(5) end end |
#start ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/recurrent/worker.rb', line 26 def start logger.info "Starting Recurrent" trap('TERM') { logger.info 'Waiting for running tasks and exiting...'; $exit = true } trap('INT') { logger.info 'Waiting for running tasks and exiting...'; $exit = true } trap('QUIT') { logger.info 'Waiting for running tasks and exiting...'; $exit = true } if Configuration.process_locking execute_with_locking else execute end logger.info("Goodbye.") end |
#wait_for_running_tasks ⇒ Object
75 76 77 78 79 80 81 |
# File 'lib/recurrent/worker.rb', line 75 def wait_for_running_tasks if Configuration.wait_for_running_tasks_on_exit_for wait_for_running_tasks_for(Configuration.wait_for_running_tasks_on_exit_for) else wait_for_running_tasks_indefinitely end end |
#wait_for_running_tasks_for(seconds) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/recurrent/worker.rb', line 83 def wait_for_running_tasks_for(seconds) while scheduler.tasks.running.any? do logger.info "Killing running tasks in #{seconds.inspect}." seconds -= 1 sleep(1) if seconds == 0 scheduler.tasks.running.each do |task| logger.info "Killing #{task.name}." task.thread = nil unless task.thread.try(:kill).try(:alive?) end end end true end |
#wait_for_running_tasks_indefinitely ⇒ Object
98 99 100 101 102 103 104 105 106 107 |
# File 'lib/recurrent/worker.rb', line 98 def wait_for_running_tasks_indefinitely if task = scheduler.tasks.running.first logger.info "Waiting for #{task.name} to finish." task.thread.try(:join) wait_for_running_tasks_indefinitely else logger.info "All tasks finished, exiting..." true end end |
#wait_until(time) ⇒ Object
109 110 111 112 113 114 |
# File 'lib/recurrent/worker.rb', line 109 def wait_until(time) until time.past? break if $exit sleep(0.5) end end |