Class: Recurrent::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/recurrent/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Worker

Returns a new instance of Worker.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/recurrent/worker.rb', line 6

def initialize(options={})
  Configuration.maximum_concurrent_tasks = options[:maximum_concurrent_tasks]
  Configuration.setup.call if Configuration.setup
  file = options[:file]
  @scheduler = Scheduler.new(file)
  if options[:every]
    every = eval(options[:every]).to_i
    if options[:ruby]
      @scheduler.every(every, options[:name]) do
        eval(options[:ruby])
      end
    elsif options[:system]
      @scheduler.every(every, options[:name]) do
        system(options[:system])
      end
    end
  end
  @logger = scheduler.logger
end

Instance Attribute Details

#loggerObject

Returns the value of attribute logger.



4
5
6
# File 'lib/recurrent/worker.rb', line 4

def logger
  @logger
end

#schedulerObject

Returns the value of attribute scheduler.



4
5
6
# File 'lib/recurrent/worker.rb', line 4

def scheduler
  @scheduler
end

Instance Method Details

#executeObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/recurrent/worker.rb', line 42

def execute
  loop do
    execution_time = scheduler.tasks.next_execution_time
    tasks_to_execute = scheduler.tasks.scheduled_to_execute_at(execution_time, :sort_by_frequency => !!Configuration.maximum_concurrent_tasks).reverse

    wait_for_running_tasks && break if $exit

    wait_until(execution_time)

    wait_for_running_tasks && break if $exit

    tasks_to_execute.each do |task|
      logger.info "#{task.name}: Executing at #{execution_time.to_s(:seconds)}"
      task.execute(execution_time)
    end

    wait_for_running_tasks && break if $exit
  end
end

#execute_with_lockingObject



62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/recurrent/worker.rb', line 62

def execute_with_locking
  lock_established = nil
  until lock_established
    break if $exit
    lock_established = Configuration.process_locking.call(*scheduler.tasks.map(&:name)) do
      execute
    end
    break if $exit
    logger.info 'Tasks are being monitored by another process. Standing by.'
    sleep(5)
  end
end

#startObject



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/recurrent/worker.rb', line 26

def start
  logger.info "Starting Recurrent"

  trap('TERM') { logger.info 'Waiting for running tasks and exiting...'; $exit = true }
  trap('INT')  { logger.info 'Waiting for running tasks and exiting...'; $exit = true }
  trap('QUIT') { logger.info 'Waiting for running tasks and exiting...'; $exit = true }

  if Configuration.process_locking
    execute_with_locking
  else
    execute
  end

  logger.info("Goodbye.")
end

#wait_for_running_tasksObject



75
76
77
78
79
80
81
# File 'lib/recurrent/worker.rb', line 75

def wait_for_running_tasks
  if Configuration.wait_for_running_tasks_on_exit_for
    wait_for_running_tasks_for(Configuration.wait_for_running_tasks_on_exit_for)
  else
    wait_for_running_tasks_indefinitely
  end
end

#wait_for_running_tasks_for(seconds) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/recurrent/worker.rb', line 83

def wait_for_running_tasks_for(seconds)
  while scheduler.tasks.running.any? do
    logger.info "Killing running tasks in #{seconds.inspect}."
    seconds -= 1
    sleep(1)
    if seconds == 0
      scheduler.tasks.running.each do |task|
        logger.info "Killing #{task.name}."
        task.thread = nil unless task.thread.try(:kill).try(:alive?)
      end
    end
  end
  true
end

#wait_for_running_tasks_indefinitelyObject



98
99
100
101
102
103
104
105
106
107
# File 'lib/recurrent/worker.rb', line 98

def wait_for_running_tasks_indefinitely
  if task = scheduler.tasks.running.first
    logger.info "Waiting for #{task.name} to finish."
    task.thread.try(:join)
    wait_for_running_tasks_indefinitely
  else
    logger.info "All tasks finished, exiting..."
    true
  end
end

#wait_until(time) ⇒ Object



109
110
111
112
113
114
# File 'lib/recurrent/worker.rb', line 109

def wait_until(time)
  until time.past?
    break if $exit
    sleep(0.5)
  end
end