Class: Threasy::Schedule

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/threasy/schedule.rb

Defined Under Namespace

Classes: Entry

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(work = nil) ⇒ Schedule

Creates new ‘Threasy::Schedule` instance

Parameters

  • ‘work` - Optional. Usually a `Threasy::Work` instance.

    Defaults to `Threasy.work`
    


33
34
35
36
37
38
# File 'lib/threasy/schedule.rb', line 33

def initialize(work = nil)
  @work = work
  @semaphore = Mutex.new
  @schedules = []
  @watcher = Thread.new{ watch }
end

Instance Attribute Details

#schedulesObject (readonly)

Returns the value of attribute schedules.



25
26
27
# File 'lib/threasy/schedule.rb', line 25

def schedules
  @schedules
end

#watcherObject (readonly)

Returns the value of attribute watcher.



25
26
27
# File 'lib/threasy/schedule.rb', line 25

def watcher
  @watcher
end

Instance Method Details

#add(*args, &block) ⇒ Object

Schedule a job

Examples

schedule = Threasy::Schedule.new(work: Threasy::Work.new)

# Schedule blocks
schedule.add(in: 5.min) { do_some_background_work }

# Schedule job objects compatible with the `work` queue
schedule.add(BackgroundJob.new(some: data), every: 1.hour)

# Enqueue strings that can be evals to a job object
schedule.add("BackgroundJob.new", every: 1.day)

Parameters

  • ‘job` - Job object which responds to `perform` or `call`

  • ‘options`

    • ‘every: n` - If present, job is repeated every `n` seconds

    • ‘in: n` - `n` seconds until job is executed

    • ‘at: Time` - Time to execute job at

  • ‘&block` - Job block

Must have either a ‘job` object or job `&block` present.

Returns

  • ‘Threasy::Schedule::Entry` if job was successfully added to schedule

  • ‘nil` if job was for the past



70
71
72
73
74
75
# File 'lib/threasy/schedule.rb', line 70

def add(*args, &block)
  options = args.last.is_a?(Hash) ? args.pop : {}
  job = block_given? ? block : args.first
  entry = Entry.new job, {schedule: self}.merge(options)
  add_entry entry if entry.future?
end

#add_entry(entry) ⇒ Object

Add a ‘Threasy::Schedule::Entry` object to `schedules`



78
79
80
81
82
83
84
85
# File 'lib/threasy/schedule.rb', line 78

def add_entry(entry)
  sync do
    schedules << entry
    schedules.sort_by!(&:at)
  end
  tickle_watcher
  entry
end

#clearObject



109
110
111
112
# File 'lib/threasy/schedule.rb', line 109

def clear
  log.debug "Clearing schedules"
  sync { schedules.clear }
end

#countObject



145
146
147
# File 'lib/threasy/schedule.rb', line 145

def count
  schedules.count
end

#eachObject



105
106
107
# File 'lib/threasy/schedule.rb', line 105

def each
  schedules.each { |entry| yield entry }
end

#entries_dueObject

Pop entries off the schedule that are due



135
136
137
138
139
140
141
142
143
# File 'lib/threasy/schedule.rb', line 135

def entries_due
  [].tap do |entries|
    sync do
      while schedules.first && schedules.first.due?
        entries << schedules.shift
      end
    end
  end
end

#logObject



149
150
151
# File 'lib/threasy/schedule.rb', line 149

def log
  Threasy.logger
end

#max_sleepObject



153
154
155
# File 'lib/threasy/schedule.rb', line 153

def max_sleep
  Threasy.config.max_sleep
end

#remove_entry(entry) ⇒ Object



92
93
94
# File 'lib/threasy/schedule.rb', line 92

def remove_entry(entry)
  sync { schedules.delete entry }
end

#syncObject



101
102
103
# File 'lib/threasy/schedule.rb', line 101

def sync
  @semaphore.synchronize { yield }
end

#tickle_watcherObject

Wakes up the watcher thread if its sleeping



97
98
99
# File 'lib/threasy/schedule.rb', line 97

def tickle_watcher
  watcher.wakeup if watcher.stop?
end

#watchObject

Used by the watcher thread to find jobs that are due, add them to the ‘work` queue, re-sort the schedule, and attempt to sleep until the next job is due.



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/threasy/schedule.rb', line 117

def watch
  loop do
    Thread.stop if schedules.empty?
    entries_due.each do |entry|
      log.debug "Adding scheduled job to work queue"
      entry.work!
      add_entry entry if entry.repeat?
    end
    next_job = @schedules.first
    if next_job && next_job.future?
      seconds = [next_job.at - Time.now, max_sleep].min
      log.debug "Schedule watcher sleeping for #{seconds} seconds"
      sleep seconds
    end
  end
end

#workObject

Returns the current work queue



88
89
90
# File 'lib/threasy/schedule.rb', line 88

def work
  @work ||= Threasy.work
end