Class: Recurrent::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/recurrent/task.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Task

Returns a new instance of Task.



7
8
9
10
11
12
13
14
15
16
# File 'lib/recurrent/task.rb', line 7

def initialize(options={})
  @name = options[:name]
  @schedule = options[:schedule]
  @action = options[:action]
  @save = options[:save]
  @logger = options[:logger]
  @scheduler = options[:scheduler]
  @disable_task_locking = options[:disable_task_locking]
  Configuration.save_task_schedule.call(name, schedule) if Configuration.save_task_schedule
end

Instance Attribute Details

#actionObject

Returns the value of attribute action.



5
6
7
# File 'lib/recurrent/task.rb', line 5

def action
  @action
end

#loggerObject

Returns the value of attribute logger.



5
6
7
# File 'lib/recurrent/task.rb', line 5

def logger
  @logger
end

#nameObject

Returns the value of attribute name.



5
6
7
# File 'lib/recurrent/task.rb', line 5

def name
  @name
end

#saveObject

Returns the value of attribute save.



5
6
7
# File 'lib/recurrent/task.rb', line 5

def save
  @save
end

#scheduleObject

Returns the value of attribute schedule.



5
6
7
# File 'lib/recurrent/task.rb', line 5

def schedule
  @schedule
end

#schedulerObject

Returns the value of attribute scheduler.



5
6
7
# File 'lib/recurrent/task.rb', line 5

def scheduler
  @scheduler
end

#threadObject

Returns the value of attribute thread.



5
6
7
# File 'lib/recurrent/task.rb', line 5

def thread
  @thread
end

Instance Method Details

#call_action(execution_time = nil) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/recurrent/task.rb', line 38

def call_action(execution_time=nil)
  if Configuration.task_locking && !@disable_task_locking
    logger.info "#{name} - #{execution_time.to_s(:seconds)}: attempting to establish lock"
    lock_established = Configuration.task_locking.call(name) do
      if Configuration.load_task_return_value && action.arity == 1
        previous_value = Configuration.load_task_return_value.call(name)

        return_value = action.call(previous_value)
      else
        return_value = action.call
      end
      save_results(return_value) if save?

      # If a task finishes quickly hold the lock for a few seconds to avoid releasing it before other processes try to pick up the task
      sleep(1) until Time.now - execution_time > 5 if execution_time
    end
    logger.info "#{name} - #{execution_time.to_s(:seconds)}: locked by another process" unless lock_established
  else
    if Configuration.load_task_return_value && action.arity == 1
      previous_value = Configuration.load_task_return_value.call(name)

      return_value = action.call(previous_value)
    else
      return_value = action.call
    end
    save_results(return_value) if save?
  end
end

#execute(execution_time) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/recurrent/task.rb', line 18

def execute(execution_time)
  return handle_still_running(execution_time) if running?
  @thread = Thread.new do
    Thread.current["execution_time"] = execution_time
    scheduler && scheduler.increment_executing_tasks
    begin
      if Configuration.maximum_concurrent_tasks.present?
        call_action(execution_time) unless (scheduler.executing_tasks > Configuration.maximum_concurrent_tasks)
      else
        call_action(execution_time)
      end
    rescue => e
      logger.warn("#{name} - #{e.message}")
      logger.warn(e.backtrace)
    ensure
      scheduler && scheduler.decrement_executing_tasks
    end
  end
end

#handle_still_running(current_time) ⇒ Object



67
68
69
70
71
72
# File 'lib/recurrent/task.rb', line 67

def handle_still_running(current_time)
  logger.info "#{name}: Execution from #{thread['execution_time'].to_s(:seconds)} still running, aborting this execution."
  if Configuration.handle_slow_task
    Configuration.handle_slow_task.call(name, current_time, thread['execution_time'])
  end
end

#next_occurrenceObject



74
75
76
77
# File 'lib/recurrent/task.rb', line 74

def next_occurrence
  occurrence = schedule.next_occurrence
  schedule.start_date = occurrence
end

#running?Boolean

Returns:

  • (Boolean)


96
97
98
# File 'lib/recurrent/task.rb', line 96

def running?
  thread.try(:alive?)
end

#save?Boolean

Returns:

  • (Boolean)


79
80
81
# File 'lib/recurrent/task.rb', line 79

def save?
  !!save
end

#save_results(return_value) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/recurrent/task.rb', line 83

def save_results(return_value)
  logger.info "#{name}: Wants to save its return value."
  if Configuration.save_task_return_value
    Configuration.save_task_return_value.call(:name => name,
                                              :return_value => return_value,
                                              :executed_at => thread['execution_time'],
                                              :executed_by => logger.identifier)
    logger.info "#{name}: Return value saved."
  else
    logger.info "#{name}: No method to save return values is configured."
  end
end