Class: Recurrent::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/recurrent/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Worker

Returns a new instance of Worker.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/recurrent/worker.rb', line 6

def initialize(options={})
  Configuration.maximum_concurrent_tasks = options[:maximum_concurrent_tasks]
  Configuration.pool_size = options[:pool_size]
  Configuration.locker_pool_size = options[:locker_pool_size]
  Configuration.setup.call if Configuration.setup
  file = options[:file]
  @scheduler = Scheduler.new(file)
  if options[:every]
    every = eval(options[:every]).to_i
    if options[:ruby]
      @scheduler.every(every, options[:name]) do
        eval(options[:ruby])
      end
    elsif options[:system]
      @scheduler.every(every, options[:name]) do
        system(options[:system])
      end
    end
  end
  @logger = scheduler.logger
end

Instance Attribute Details

#loggerObject

Returns the value of attribute logger.



4
5
6
# File 'lib/recurrent/worker.rb', line 4

def logger
  @logger
end

#schedulerObject

Returns the value of attribute scheduler.



4
5
6
# File 'lib/recurrent/worker.rb', line 4

def scheduler
  @scheduler
end

Instance Method Details

#executeObject



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/recurrent/worker.rb', line 44

def execute
  loop do
    execution_time = scheduler.tasks.next_execution_time
    tasks_to_execute = scheduler.tasks.scheduled_to_execute_at(execution_time, :sort_by_frequency => !!Configuration.maximum_concurrent_tasks).reverse

    wait_for_running_tasks && break if $exit

    wait_until(execution_time)

    wait_for_running_tasks && break if $exit

    tasks_to_execute.each do |task|
      logger.info "#{task.name}: Executing at #{execution_time.to_s(:seconds)}"
      task.execute(execution_time)
    end

    wait_for_running_tasks && break if $exit
  end
end

#execute_with_lockingObject



64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/recurrent/worker.rb', line 64

def execute_with_locking
  lock_established = nil
  until lock_established
    break if $exit
    lock_established = Configuration.process_locking.call(*scheduler.tasks.map(&:name)) do
      execute
    end
    break if $exit
    logger.info 'Tasks are being monitored by another process. Standing by.'
    sleep(5)
  end
end

#startObject



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/recurrent/worker.rb', line 28

def start
  logger.info "Starting Recurrent"

  trap('TERM') { logger.info 'Waiting for running tasks and exiting...'; $exit = true }
  trap('INT')  { logger.info 'Waiting for running tasks and exiting...'; $exit = true }
  trap('QUIT') { logger.info 'Waiting for running tasks and exiting...'; $exit = true }

  if Configuration.process_locking
    execute_with_locking
  else
    execute
  end

  logger.info("Goodbye.")
end

#wait_for_running_tasksObject



77
78
79
80
81
82
83
# File 'lib/recurrent/worker.rb', line 77

def wait_for_running_tasks
  if Configuration.wait_for_running_tasks_on_exit_for
    wait_for_running_tasks_for(Configuration.wait_for_running_tasks_on_exit_for)
  else
    wait_for_running_tasks_indefinitely
  end
end

#wait_for_running_tasks_for(seconds) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/recurrent/worker.rb', line 85

def wait_for_running_tasks_for(seconds)
  while scheduler.tasks.running.any? do
    logger.info "Killing running tasks in #{seconds.inspect}."
    seconds -= 1
    sleep(1)
    if seconds == 0
      scheduler.tasks.running.each do |task|
        logger.info "Killing #{task.name}."
        task.thread = nil unless task.thread.try(:kill).try(:alive?)
      end
    end
  end
  true
end

#wait_for_running_tasks_indefinitelyObject



100
101
102
103
104
105
106
107
108
109
# File 'lib/recurrent/worker.rb', line 100

def wait_for_running_tasks_indefinitely
  if task = scheduler.tasks.running.first
    logger.info "Waiting for #{task.name} to finish."
    task.thread.try(:join)
    wait_for_running_tasks_indefinitely
  else
    logger.info "All tasks finished, exiting..."
    true
  end
end

#wait_until(time) ⇒ Object



111
112
113
114
115
116
# File 'lib/recurrent/worker.rb', line 111

def wait_until(time)
  until time.past?
    break if $exit
    sleep(0.5)
  end
end