Class: Recurrent::Task
- Inherits:
-
Object
- Object
- Recurrent::Task
- Defined in:
- lib/recurrent/task.rb
Instance Attribute Summary collapse
-
#action ⇒ Object
Returns the value of attribute action.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#name ⇒ Object
Returns the value of attribute name.
-
#save ⇒ Object
Returns the value of attribute save.
-
#schedule ⇒ Object
Returns the value of attribute schedule.
-
#scheduler ⇒ Object
Returns the value of attribute scheduler.
-
#thread ⇒ Object
Returns the value of attribute thread.
Instance Method Summary collapse
- #call_action(execution_time = nil) ⇒ Object
- #execute(execution_time) ⇒ Object
- #handle_still_running(current_time) ⇒ Object
-
#initialize(options = {}) ⇒ Task
constructor
A new instance of Task.
- #next_occurrence ⇒ Object
- #running? ⇒ Boolean
- #save? ⇒ Boolean
- #save_results(return_value) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Task
Returns a new instance of Task.
7 8 9 10 11 12 13 14 15 16 |
# File 'lib/recurrent/task.rb', line 7 def initialize(={}) @name = [:name] @schedule = [:schedule] @action = [:action] @save = [:save] @logger = [:logger] @scheduler = [:scheduler] @disable_task_locking = [:disable_task_locking] Configuration.save_task_schedule.call(name, schedule) if Configuration.save_task_schedule end |
Instance Attribute Details
#action ⇒ Object
Returns the value of attribute action.
5 6 7 |
# File 'lib/recurrent/task.rb', line 5 def action @action end |
#logger ⇒ Object
Returns the value of attribute logger.
5 6 7 |
# File 'lib/recurrent/task.rb', line 5 def logger @logger end |
#name ⇒ Object
Returns the value of attribute name.
5 6 7 |
# File 'lib/recurrent/task.rb', line 5 def name @name end |
#save ⇒ Object
Returns the value of attribute save.
5 6 7 |
# File 'lib/recurrent/task.rb', line 5 def save @save end |
#schedule ⇒ Object
Returns the value of attribute schedule.
5 6 7 |
# File 'lib/recurrent/task.rb', line 5 def schedule @schedule end |
#scheduler ⇒ Object
Returns the value of attribute scheduler.
5 6 7 |
# File 'lib/recurrent/task.rb', line 5 def scheduler @scheduler end |
#thread ⇒ Object
Returns the value of attribute thread.
5 6 7 |
# File 'lib/recurrent/task.rb', line 5 def thread @thread end |
Instance Method Details
#call_action(execution_time = nil) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/recurrent/task.rb', line 38 def call_action(execution_time=nil) if Configuration.task_locking && !@disable_task_locking logger.info "#{name} - #{execution_time.to_s(:seconds)}: attempting to establish lock" lock_established = Configuration.task_locking.call(name) do if Configuration.load_task_return_value && action.arity == 1 previous_value = Configuration.load_task_return_value.call(name) return_value = action.call(previous_value) else return_value = action.call end save_results(return_value) if save? # If a task finishes quickly hold the lock for a few seconds to avoid releasing it before other processes try to pick up the task sleep(1) until Time.now - execution_time > 5 if execution_time end logger.info "#{name} - #{execution_time.to_s(:seconds)}: locked by another process" unless lock_established else if Configuration.load_task_return_value && action.arity == 1 previous_value = Configuration.load_task_return_value.call(name) return_value = action.call(previous_value) else return_value = action.call end save_results(return_value) if save? end end |
#execute(execution_time) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/recurrent/task.rb', line 18 def execute(execution_time) return handle_still_running(execution_time) if running? @thread = Thread.new do Thread.current["execution_time"] = execution_time scheduler && scheduler.increment_executing_tasks begin if Configuration.maximum_concurrent_tasks.present? call_action(execution_time) unless (scheduler.executing_tasks > Configuration.maximum_concurrent_tasks) else call_action(execution_time) end rescue => e logger.warn("#{name} - #{e.}") logger.warn(e.backtrace) ensure scheduler && scheduler.decrement_executing_tasks end end end |
#handle_still_running(current_time) ⇒ Object
67 68 69 70 71 72 |
# File 'lib/recurrent/task.rb', line 67 def handle_still_running(current_time) logger.info "#{name}: Execution from #{thread['execution_time'].to_s(:seconds)} still running, aborting this execution." if Configuration.handle_slow_task Configuration.handle_slow_task.call(name, current_time, thread['execution_time']) end end |
#next_occurrence ⇒ Object
74 75 76 77 |
# File 'lib/recurrent/task.rb', line 74 def next_occurrence occurrence = schedule.next_occurrence schedule.start_date = occurrence end |
#running? ⇒ Boolean
96 97 98 |
# File 'lib/recurrent/task.rb', line 96 def running? thread.try(:alive?) end |
#save? ⇒ Boolean
79 80 81 |
# File 'lib/recurrent/task.rb', line 79 def save? !!save end |
#save_results(return_value) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/recurrent/task.rb', line 83 def save_results(return_value) logger.info "#{name}: Wants to save its return value." if Configuration.save_task_return_value Configuration.save_task_return_value.call(:name => name, :return_value => return_value, :executed_at => thread['execution_time'], :executed_by => logger.identifier) logger.info "#{name}: Return value saved." else logger.info "#{name}: No method to save return values is configured." end end |