Class: When::Do

Inherits:
Object
  • Object
show all
Defined in:
lib/when-do/do.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Do

Returns a new instance of Do.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/when-do/do.rb', line 10

def initialize(opts={})
  @logger            = init_logger(opts[:log_path], opts[:log_level])

  Process.daemon(true) if opts.has_key?(:daemonize)

  @pid_file_path = opts[:pid_file_path]
  if pid_file_path
    File.open(pid_file_path, 'w') { |f| f.write(Process.pid) }
  end

  redis_opts         = opts[:redis_opts]        || {}
  @redis             = Redis.new(redis_opts)

  @schedule_key      = opts[:schedule_key]      || 'when:schedules'
  @worker_queue_key  = opts[:worker_queue_key]  || 'when:queue:default'
  @delayed_queue_key = opts[:delayed_queue_key] || 'when:delayed'
end

Instance Attribute Details

#delayed_queue_keyObject (readonly)

Returns the value of attribute delayed_queue_key.



8
9
10
# File 'lib/when-do/do.rb', line 8

def delayed_queue_key
  @delayed_queue_key
end

#loggerObject (readonly)

Returns the value of attribute logger.



8
9
10
# File 'lib/when-do/do.rb', line 8

def logger
  @logger
end

#pid_file_pathObject (readonly)

Returns the value of attribute pid_file_path.



8
9
10
# File 'lib/when-do/do.rb', line 8

def pid_file_path
  @pid_file_path
end

#redisObject (readonly)

Returns the value of attribute redis.



8
9
10
# File 'lib/when-do/do.rb', line 8

def redis
  @redis
end

#schedule_keyObject (readonly)

Returns the value of attribute schedule_key.



8
9
10
# File 'lib/when-do/do.rb', line 8

def schedule_key
  @schedule_key
end

#worker_queue_keyObject (readonly)

Returns the value of attribute worker_queue_key.



8
9
10
# File 'lib/when-do/do.rb', line 8

def worker_queue_key
  @worker_queue_key
end

Instance Method Details

#analyzeObject



65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/when-do/do.rb', line 65

def analyze
  ['HUP', 'INT', 'TERM', 'QUIT'].each { |sig| Signal.trap(sig) { }}
  started_at = Time.now
  if running?(started_at)
    logger.info('Another process is already analyzing.')
  else
    logger.debug { 'Analyzing.' }
    queue_scheduled(started_at)
    queue_delayed(started_at)
  end
  exit
end

#analyze_in_child_processObject



52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/when-do/do.rb', line 52

def analyze_in_child_process
  if pid = fork
    Thread.new {
      pid, status = Process.wait2(pid)
      if status.exitstatus != 0
        raise "Child (pid: #{pid} exited with non-zero status. Check logs."
      end
    }.abort_on_exception = true
  else
    analyze
  end
end

#build_day_key(started_at) ⇒ Object



92
93
94
# File 'lib/when-do/do.rb', line 92

def build_day_key(started_at)
  "#{schedule_key}:#{started_at.to_s.split(' ')[0]}"
end

#build_min_key(started_at) ⇒ Object



96
97
98
# File 'lib/when-do/do.rb', line 96

def build_min_key(started_at)
  "#{started_at.hour}:#{started_at.min}"
end

#enqueue(jobs) ⇒ Object



125
126
127
128
129
130
131
132
133
# File 'lib/when-do/do.rb', line 125

def enqueue(jobs)
  jobs.each do |job|
    logger.info("Queueing: #{job}")
  end
  success = redis.lpush(worker_queue_key, jobs)
  unless  success > 0
    raise "Failed to queue all jobs. Redis returned #{success}."
  end
end

#queue_delayed(started_at) ⇒ Object



115
116
117
118
119
120
121
122
123
# File 'lib/when-do/do.rb', line 115

def queue_delayed(started_at)
  logger.info("Checking for delayed jobs.")
  delayed_jobs = redis.multi do
    redis.zrevrangebyscore(delayed_queue_key, started_at.to_i, '-inf')
    redis.zremrangebyscore(delayed_queue_key, '-inf', started_at.to_i)
  end[0]
  logger.debug { "Found #{delayed_jobs.count} delayed jobs." }
  enqueue(delayed_jobs) if delayed_jobs.any?
end

#queue_scheduled(started_at) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/when-do/do.rb', line 100

def queue_scheduled(started_at)
  schedules = redis.hvals(schedule_key)
  logger.info("Analyzing #{schedules.count} schedules.")
  scheduled_jobs = schedules.inject([]) do |jobs, s|
    schedule = JSON.parse(s)
    cron = When::Cron.new(schedule['cron'])
    if cron == started_at
      jobs << { 'jid' => SecureRandom.uuid, 'class' => schedule['class'], 'args' => schedule['args'] }.to_json
    end
    jobs
  end
  logger.debug { "Found #{scheduled_jobs.count} schedules due to be queued." }
  enqueue(scheduled_jobs) if scheduled_jobs.any?
end

#running?(started_at) ⇒ Boolean

Returns:

  • (Boolean)


78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/when-do/do.rb', line 78

def running?(started_at)
  day_key = build_day_key(started_at)
  min_key = build_min_key(started_at)

  logger.debug { "Checking Redis using day_key: '#{day_key}' and min_key: '#{min_key}'"}
  check_and_set_analyzed = redis.multi do
    redis.hget(day_key, min_key)
    redis.hset(day_key, min_key, 't')
    redis.expire(day_key, 60 * 60 * 24)
  end

  check_and_set_analyzed[0]
end

#start_loopObject



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/when-do/do.rb', line 28

def start_loop
  logger.info("Starting...")
  logger.info { "Schedule key: '#{schedule_key}', worker queue key: '#{worker_queue_key}', delayed queue key: '#{delayed_queue_key}'" }
  logger.info { "PID file: #{pid_file_path}" } if pid_file_path

  loop do
    sleep_until_next_minute
    logger.debug { "Using #{`ps -o rss -p #{Process.pid}`.chomp.split("\n").last.to_i} kb of memory." }
    analyze_in_child_process
  end

rescue SystemExit => e
  raise e

rescue SignalException => e
  logger.info(e.inspect)
  File.delete(pid_file_path) if pid_file_path
  raise e

rescue Exception => e
  ([e.inspect] + e.backtrace).each { |line| logger.fatal(line) }
  raise e
end