Class: SerialScheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/serial_scheduler.rb,
lib/serial_scheduler/version.rb

Constant Summary collapse

VERSION =
"0.1.0"

Instance Method Summary collapse

Constructor Details

#initialize(logger: Logger.new(STDOUT), error_handler: ->(e) { raise e }) ⇒ SerialScheduler

Returns a new instance of SerialScheduler.



8
9
10
11
12
13
14
# File 'lib/serial_scheduler.rb', line 8

def initialize(logger: Logger.new(STDOUT), error_handler: ->(e) { raise e })
  @logger = logger
  @error_handler = error_handler

  @producers = []
  @stopped = false
end

Instance Method Details

#add(name, interval:, timeout:, &block) ⇒ Object

start a new thread that enqueues an execution at given interval

Raises:

  • (ArgumentError)


17
18
19
20
21
# File 'lib/serial_scheduler.rb', line 17

def add(name, interval:, timeout:, &block)
  raise ArgumentError if interval < 1 || !interval.is_a?(Integer)

  @producers << { name: name, interval: interval, timeout: timeout, block: block, next: 0 }
end

#runObject



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/serial_scheduler.rb', line 23

def run
  # interval 1s: do not wait
  # interval 1d: if we are 1 hour into the day next execution is in 23 hours
  now = Time.now.to_i
  @producers.each { |p| p[:next] = now + (p[:interval] - (now % p[:interval]) - 1) }

  loop do
    earliest = @producers.min_by { |p| p[:next] }
    wait = [earliest[:next] - Time.now.to_i, 0].max

    if wait > 0
      @logger.info message: "Waiting to start job", job: earliest[:name], time: wait
      wait.times do
        break if @stopped

        sleep 1
      end
    end
    break if @stopped

    earliest[:next] += earliest[:interval]
    execute_in_fork earliest
  end
end

#stopObject



48
49
50
# File 'lib/serial_scheduler.rb', line 48

def stop
  @stopped = true
end