Class: CronR::Cron

Inherits:
Array
  • Object
show all
Defined in:
lib/CronR/Cron.rb

Overview

This is an array that is modified to represent a cron table along with the necessary machinery to run it.

cron = Cron.new
# Or
cron = Cron.new(job1,job2,...)

To start cron:

cron.start

Jobs are instances of CronJob (although these are just glorified hashes). In fact, Cron expects job instances to respond to:

runnable?(time) => if we can run now
once? => this job is a one-off

Cron uses ruby’s Time object to establish what the curren time is.

cron.time => current time

You can however override cron’s idea of the current time. eg using ActiveSupport time zones:

cron.time {
  Time.use_zone("Sydney") {
    Time.zone.now  
  }
}

cron.time => <the time in new timezone>

To alter cron after it’s started (remember, it’s just an array):

cron.suspend {|arr|
  ... do something with cron ...
  ... arr == this cron instance ...
  arr << cron_job  # for example
}

#suspend will wait for the thread dispatching jobs to be safely sleeping.

To gracefully stop cron:

cron.stop {
   ... do something now we've stopped ...
}

RUNNABLE ITEMS

If a runnable item carries a proc, we don’t run it - the thread used in Cron is just for doing cron. So runnable items are basically enqueued to @queue. You can create an thread and have it dequeue this queue.

Thread.new {
  loop {
    job = cron.queue.deq
    job.run  # Using CronJob#run .
    ...
  }
}

You can also replace queue with something. It should respond to #enq and #deq and be thread-safe. One example might be to have #enq insert a record into a table.

See Also:

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*jobs) ⇒ Cron

*items should consist of 0 or more items of form:

[job_id(string),CronJob.new(...),thing]


92
93
94
95
96
97
98
99
100
101
102
# File 'lib/CronR/Cron.rb', line 92

def initialize *jobs
  super()
  @stopped = true
  @suspended = false
  @queue = Queue.new
  jobs.each{|job|
    self.push(job)
  }
  @mutex = Mutex.new
  @timezone = nil
end

Instance Attribute Details

#debugObject

Returns the value of attribute debug.



86
87
88
# File 'lib/CronR/Cron.rb', line 86

def debug
  @debug
end

#mutexObject (readonly)

Returns the value of attribute mutex.



85
86
87
# File 'lib/CronR/Cron.rb', line 85

def mutex
  @mutex
end

#queueObject

Returns the value of attribute queue.



86
87
88
# File 'lib/CronR/Cron.rb', line 86

def queue
  @queue
end

#stoppedObject (readonly)

Returns the value of attribute stopped.



85
86
87
# File 'lib/CronR/Cron.rb', line 85

def stopped
  @stopped
end

#suspendedObject (readonly)

Returns the value of attribute suspended.



85
86
87
# File 'lib/CronR/Cron.rb', line 85

def suspended
  @suspended
end

#threadObject (readonly)

Returns the value of attribute thread.



85
86
87
# File 'lib/CronR/Cron.rb', line 85

def thread
  @thread
end

#timezoneObject

Returns the value of attribute timezone.



87
88
89
# File 'lib/CronR/Cron.rb', line 87

def timezone
  @timezone
end

Instance Method Details

#run(time = nil) ⇒ Object

Check each item in this array, if runnable push it on a queue.

We assume that this item is or simlar to CronJob. We don’t call cron_job#job. That is the work for another thread esp. if #job is a Proc.



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/CronR/Cron.rb', line 124

def run time=nil

  puts "[cron] run called #{Time.now}" if @debug
  time = self.time if time.nil?
  self.each { |cron_job|
    ok,details = cron_job.runnable?(time)
    if ok then
      @queue.enq(cron_job)
      if cron_job.once? then
        cron_job[:delete] = true
      end
    end
  }
  self.reject! { |cron_job|
    cron_job[:delete]
  }
end

#start(debug = false, method = :every_minute, *args) ⇒ Object

Start cron.

Will wake up every minute and perform #run.



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/CronR/Cron.rb', line 146

def start debug=false,method=:every_minute,*args
  @stopped = false
  @suspended = false
  @dead = Queue.new
  @thread = CronR::Utils.send(method,debug,*args) {
    time = self.time
    @mutex.synchronize {
      if @stopped then
        # It's important we put something on this queue ONLY AFTER
        # we've acquired the mutex...
        @dead.enq(true)
        true
      elsif @suspended then
      else
        self.run(time)
      end
    }
  }
end

#stop(&block) ⇒ Object

Gracefully stop the thread.

If block is given, it will be called once the thread has stopped.



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/CronR/Cron.rb', line 190

def stop &block
  if block_given? then
    @stopped = true
    @suspended = false
    # Wait till something is put on the dead queue...
    # This stops us from acquiring the mutex until after @thread
    # has processed @stopped set to true.
    sig = @dead.deq
    # The cron thread should be dead now, or wrapping up (with the
    # acquired mutex)... 
    @mutex.synchronize {
      while @thread.alive?
        sleep 0.2
      end
      block.call(self)
    }
  end
end

#suspend(&block) ⇒ Object

Suspend the thread.

If block is given, it will only be called once the thread is sleeping again. You can use this to safely modify this array.



172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/CronR/Cron.rb', line 172

def suspend &block
  if block_given? then
    @mutex.synchronize {
      begin
        @suspended = true
        block.call(self)
      ensure
        @suspended = false
      end
    }
  end
end

#timeObject

Get current time.

If passed a block, the block will be used to get the time.



108
109
110
111
112
113
114
115
116
# File 'lib/CronR/Cron.rb', line 108

def time
  if @timezone then
    Time.use_zone(@timezone) {
      Time.zone.now
    }
  else
    Time.now
  end
end