Class: Updater::Update
- Inherits:
-
Object
- Object
- Updater::Update
- Includes:
- DataMapper::Resource
- Defined in:
- lib/updater/update.rb,
lib/updater/update_dm.rb
Overview
the basic class that drives updater
Class Attribute Summary collapse
-
.finder_id ⇒ Object
This is the application level default method to call on an instance type target.
-
.finder_method ⇒ Object
This is the application level default method to call on a class in order to find/create a target instance.
-
.logger ⇒ Object
Returns the logger instance.
-
.orm ⇒ Object
This attribute must be set to some ORM that will persist the data.
-
.socket ⇒ Object
This is an open IO socket that will be writen to when a job is scheduled.
Instance Attribute Summary collapse
-
#error ⇒ Object
readonly
Contains the Error class after an error is caught in
run
. -
#orm ⇒ Object
readonly
Contains the underlying ORM instance (eg. ORM::Datamapper or ORM Mongo).
-
#params ⇒ Object
In order to reduce the proliferation of chained jobs in the queue, jobs chain request are allowed a params value that will pass specific values to a chained method.
Class Method Summary collapse
-
.at(time, target, method, args = [], options = {}) ⇒ Object
Request that the target be sent the method with args at the given time.
-
.chain(*args) ⇒ Object
like
at
but without a time to run. -
.clear_all ⇒ Object
Remove all scheduled jobs.
-
.clear_locks(worker) ⇒ Object
Ensure that a worker no longer holds any locks.
-
.current ⇒ Object
A filter for all requests that are ready to run, that is they requested to be run before or at time.now.
-
.delayed ⇒ Object
A filter for all requests that are not yet ready to run, that is time is after time.now.
-
.for(target, name = nil) ⇒ Object
Retrieves all updates for a conforming target possibly limiting the results to the named request.
-
.future(n) ⇒ Object
how many jobs will happen in the next n seconds.
-
.immidiate(*args) ⇒ Object
like
at
but with time as time.now. -
.in(t, *args) ⇒ Object
Run this job in ‘time’ seconds from now.
-
.load ⇒ Object
The number of jobs currently backloged in the system.
-
.pid ⇒ Object
The PID of the worker process.
-
.pid=(p) ⇒ Object
Sets the process id of the worker process if known.
- .queue_time ⇒ Object
-
.reschedule(update, hash = {}) ⇒ Object
Create a new job having the same charistics as the old, except that ‘hash’ will override the original.
-
.schedule(hash) ⇒ Object
Advanced: This method allows values to be passed directly to the ORM layer’s create method.
-
.time ⇒ Object
The time class used by Updater.
-
.time=(klass) ⇒ Object
By default Updater will use the system time (Time class) to get the current time.
-
.work_off(worker) ⇒ Object
Gets a single job form the queue, locks and runs it.
-
.worker_set(limit = 5, options = {}) ⇒ Object
This returns a set of update requests.
Instance Method Summary collapse
- #==(other) ⇒ Object
-
#id ⇒ Object
This is the appropriate value to use for a chanable field value.
-
#initialize(orm_inst) ⇒ Update
constructor
orm_inst must be set to an instacne of the class Update.orm.
-
#inspect ⇒ Object
:nodoc:.
-
#lock(worker) ⇒ Object
atempt to lock this record for the worker.
- #locked? ⇒ Boolean
- #locked_by ⇒ Object
-
#method_missing(method, *args) ⇒ Object
see if this method was intended for the underlying ORM layer.
-
#name ⇒ Object
Jobs may be named to make them easier to find.
-
#name=(n) ⇒ Object
Jobs may be named to make them easier to find.
-
#persistant? ⇒ Boolean
If this is true, the job will NOT be removed after it is run.
-
#run(job = nil) ⇒ Object
Send the method with args to the target.
-
#run_with_lock(worker) ⇒ Object
Like run but first aquires a lock for the worker.
-
#sub_args(job, a) ⇒ Object
Use and Purpose Takes a previous job and the original array of arguments form the data store.
-
#target ⇒ Object
Returns the Class or instance that will recieve the method call.
Constructor Details
#initialize(orm_inst) ⇒ Update
orm_inst must be set to an instacne of the class Update.orm
62 63 64 65 |
# File 'lib/updater/update.rb', line 62 def initialize(orm_inst) raise ArgumentError if orm_inst.nil? || !orm_inst.kind_of?(orm) @orm = orm_inst end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(method, *args) ⇒ Object
see if this method was intended for the underlying ORM layer.
46 47 48 |
# File 'lib/updater/update.rb', line 46 def method_missing(method, *args) @orm.send(method,*args) end |
Class Attribute Details
.finder_id ⇒ Object
This is the application level default method to call on an instance type target. It should return a value to be passed to the #finder_method (above) inorder to retrieve the instance from the datastore. (eg. id) In most circumstances the ORM layer defines an appropriate default and this does not need to be explcitly set.
MongoDB is one significant exception to this rule. The Updater Mongo ORM layer uses the 10gen MongoDB dirver directly without an ORM such as Mongoid or Mongo_Mapper. If the application uses ond of thes ORMs #finder_method and #finder_id should be explicitly set.
186 187 188 |
# File 'lib/updater/update.rb', line 186 def finder_id @finder_id end |
.finder_method ⇒ Object
This is the application level default method to call on a class in order to find/create a target instance. (e.g find, get, find_one, etc…). In most circumstances the ORM layer defines an appropriate default and this does not need to be explcitly set.
MongoDB is one significant exception to this rule. The Updater Mongo ORM layer uses the 10gen MongoDB dirver directly without an ORM such as Mongoid or Mongo_Mapper. If the application uses ond of thes ORMs #finder_method and #finder_id should be explicitly set.
176 177 178 |
# File 'lib/updater/update.rb', line 176 def finder_method @finder_method end |
.logger ⇒ Object
Returns the logger instance. If it has not been set, a new Logger will be created pointing to STDOUT
203 204 205 |
# File 'lib/updater/update.rb', line 203 def logger @logger ||= Logger.new(STDOUT) end |
.orm ⇒ Object
This attribute must be set to some ORM that will persist the data. The value is normally set using one of the methods in Updater::Setup.
167 168 169 |
# File 'lib/updater/update.rb', line 167 def orm @orm end |
.socket ⇒ Object
This is an open IO socket that will be writen to when a job is scheduled. If it is unset then @pid is signaled instead.
197 198 199 |
# File 'lib/updater/update.rb', line 197 def socket @socket end |
Instance Attribute Details
#error ⇒ Object (readonly)
Contains the Error class after an error is caught in run
. Not stored to the database.
8 9 10 |
# File 'lib/updater/update.rb', line 8 def error @error end |
#orm ⇒ Object (readonly)
Contains the underlying ORM instance (eg. ORM::Datamapper or ORM Mongo)
11 12 13 |
# File 'lib/updater/update.rb', line 11 def orm @orm end |
#params ⇒ Object
In order to reduce the proliferation of chained jobs in the queue, jobs chain request are allowed a params value that will pass specific values to a chained method. When a chained instance is created, the job processor will set this value. It will then be sent to the target method in plance of ‘__param__’. See #sub_args
18 19 20 |
# File 'lib/updater/update.rb', line 18 def params @params end |
Class Method Details
.at(time, target, method, args = [], options = {}) ⇒ Object
Request that the target be sent the method with args at the given time.
Parameters
time <Integer | Object responding to to_i>, by default the number of seconds sence the epoch.
What ‘time’ references can be set by sending the a substitute class to the time= method.
target <Class | instance> . If target is a class then ‘method’ will be sent to that class (unless the finder option is used. Otherwise, the target will be assumed to be the result of (target.class).get(target.id). The finder method (:get by default) and the finder_args (target.id by default) can be set in the options. A DataMapper instance passed as the target will “just work.” Any object can be found in this mannor is known as a ‘conforming instance’.
method <Symbol>. The method that will be sent to the calculated target.
args <Array> a list of arguments to be sent to with the method call. Note: ‘args’ must be seirialiable with Marshal.dump. Defaults to []
options <Hash> Addational options that will be used to configure the request. see Options section below.
Options
:finder <Symbol> This method will be sent to the stored target class (either target or target.class) inorder to extract the instance on which to preform the request. By default :get is used. For example to use on an ActiveRecord class
:finder=>:find
:finder_args <Array> | <Object>. This is passed to the finder function. By default it is target.id. Note that by setting :finder_args you will force Updater to calculate in instance as the computed target even if you pass a Class as the target.
:name <String> A string sent by the requesting class to identify the request. ‘name’ must be unique for a given computed target. Names cannot be used effectivally when a Class has non- conforming instances as there is no way predict the results of a finder call. ‘name’ can be used in conjunction with the for
method to manipulate requests effecting an object or class after they are set. See for
for examples
:failure <Updater> an other request to be run if this request raises an error. Usually the failure request will be created with the chane
method.
Examples
Updater.at(Chronic.parse('tomorrow'),Foo,:bar,[]) # will run Foo.bar() tomorrow at midnight
f = Foo.create
u = Updater.at(Chronic.parse('2 hours form now'),f,:bar,[]) # will run Foo.get(f.id).bar in 2 hours
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 |
# File 'lib/updater/update.rb', line 292 def at(t,target,method = nil,args=[],={}) hash = Hash.new hash[:time] = t.to_i unless t.nil? hash[:target],hash[:finder],hash[:finder_args] = target_for(target, ) hash[:method] = method || :perform hash[:method_args] = args [:name,:failure,:success,:ensure].each do |opt| hash[opt] = [opt] if [opt] end hash[:persistant] = [:persistant] || t.nil? ? true : false schedule(hash) end |
.chain(*args) ⇒ Object
like at
but without a time to run. This is used to create requests that run in responce to the failure of other requests. See at
for details
342 343 344 |
# File 'lib/updater/update.rb', line 342 def chain(*args) at(nil,*args) end |
.clear_all ⇒ Object
Remove all scheduled jobs. Mostly intended for testing, but may also be useful in cases of crashes or system corruption. removes all pending jobs.
404 405 406 |
# File 'lib/updater/update.rb', line 404 def clear_all @orm.clear_all end |
.clear_locks(worker) ⇒ Object
Ensure that a worker no longer holds any locks.
222 |
# File 'lib/updater/update.rb', line 222 def clear_locks(worker); @orm.clear_locks(worker); end |
.current ⇒ Object
A filter for all requests that are ready to run, that is they requested to be run before or at time.now
381 382 383 |
# File 'lib/updater/update.rb', line 381 def current @orm.current end |
.delayed ⇒ Object
A filter for all requests that are not yet ready to run, that is time is after time.now
391 392 393 |
# File 'lib/updater/update.rb', line 391 def delayed @orm.delayed end |
.for(target, name = nil) ⇒ Object
Retrieves all updates for a conforming target possibly limiting the results to the named request.
Parameters
target <Class | Object> a class or conforming object that postentially is the calculated target of a request.
name(optional) <String> If a name is sent, the first request with fot this target with this name will be returned.
Returns
<Array> unless name is given then only a single [Updater] instance.
360 361 362 363 364 |
# File 'lib/updater/update.rb', line 360 def for(target,name=nil) target,finder,args = target_for(target) ret = @orm.for(target,finder,args,name).map {|i| new(i)} name ? ret.first : ret end |
.future(n) ⇒ Object
how many jobs will happen in the next n seconds
397 398 399 400 |
# File 'lib/updater/update.rb', line 397 def future(start,finish = nil) start, finish = [0, start] unless finish @orm.future(start,finish) end |
.immidiate(*args) ⇒ Object
like at
but with time as time.now. Generally this will be used to run a long running operation in asyncronously in a differen process. See at
for details
336 337 338 |
# File 'lib/updater/update.rb', line 336 def immidiate(*args) at(time.now,*args) end |
.in(t, *args) ⇒ Object
Run this job in ‘time’ seconds from now. See at
for details on expected args.
311 312 313 |
# File 'lib/updater/update.rb', line 311 def in(t,*args) at(time.now+t,*args) end |
.load ⇒ Object
The number of jobs currently backloged in the system
386 387 388 |
# File 'lib/updater/update.rb', line 386 def load @orm.current_load end |
.pid ⇒ Object
The PID of the worker process
428 429 430 |
# File 'lib/updater/update.rb', line 428 def pid @pid end |
.pid=(p) ⇒ Object
Sets the process id of the worker process if known. If this is set then an attempt will be made to signal the worker any time a new update is made.
If pid is not set, or is set to nil then the scheduleing program is responcible for waking-up a potentially sleeping worker process in another way.
417 418 419 420 421 422 423 424 425 |
# File 'lib/updater/update.rb', line 417 def pid=(p) return @pid = nil unless p #tricky assignment in return @pid = Integer("#{p}") #safety check that prevents a curupted PID file from crashing the system Process::kill 0, @pid #check that the process exists @pid rescue Errno::ESRCH, ArgumentError @pid = nil raise ArgumentError, "PID was invalid" end |
.queue_time ⇒ Object
263 264 265 266 267 268 |
# File 'lib/updater/update_dm.rb', line 263 def queue_time nxt = self.first(:time.not=>nil,:lock_name=>nil, :order=>[:time.asc]) return nil unless nxt return 0 if nxt.time <= time.now.to_i return nxt.time - time.now.to_i end |
.reschedule(update, hash = {}) ⇒ Object
Create a new job having the same charistics as the old, except that ‘hash’ will override the original.
327 328 329 330 331 332 |
# File 'lib/updater/update.rb', line 327 def reschedule(update, hash={}) new_job = update.orm.dup new_job.update_attributes(hash) new_job.save new(new_job) end |
.schedule(hash) ⇒ Object
Advanced: This method allows values to be passed directly to the ORM layer’s create method. use at
and friends for everyday use cases.
317 318 319 320 321 322 323 324 |
# File 'lib/updater/update.rb', line 317 def schedule(hash) r = new(@orm.create(hash)) signal_worker r rescue NoMethodError raise ArgumentError, "ORM not initialized!" if @orm.nil? raise end |
.time ⇒ Object
The time class used by Updater. See time=
367 368 369 |
# File 'lib/updater/update.rb', line 367 def time @time ||= Time end |
.time=(klass) ⇒ Object
By default Updater will use the system time (Time class) to get the current time. The application that Updater was developed for used a game clock that could be paused or restarted. This method allows us to substitute a custom class for Time. This class must respond with in interger or Time to the #now method.
375 376 377 |
# File 'lib/updater/update.rb', line 375 def time=(klass) @time = klass end |
.work_off(worker) ⇒ Object
Gets a single job form the queue, locks and runs it.
210 211 212 213 214 215 216 217 218 219 |
# File 'lib/updater/update.rb', line 210 def work_off(worker) inst = @orm.lock_next(worker) if inst worker.logger.debug " running job #{inst.id}" new(inst).run end @orm.queue_time ensure clear_locks(worker) end |
.worker_set(limit = 5, options = {}) ⇒ Object
This returns a set of update requests. The first parameter is the maximum number to return (get a few other workers may be in compitition) The second optional parameter is a list of options to be past to DataMapper.
234 235 236 237 238 |
# File 'lib/updater/update_dm.rb', line 234 def worker_set(limit = 5, ={}) #TODO: add priority to this. = {:lock_name=>nil,:limit=>limit, :order=>[:time.asc]}.merge() current.all() end |
Instance Method Details
#==(other) ⇒ Object
82 83 84 |
# File 'lib/updater/update.rb', line 82 def ==(other) id = other.id end |
#id ⇒ Object
This is the appropriate value to use for a chanable field value
78 79 80 |
# File 'lib/updater/update.rb', line 78 def id @orm.id end |
#inspect ⇒ Object
:nodoc:
293 294 295 296 297 |
# File 'lib/updater/update_dm.rb', line 293 def inspect "#<Updater::Update target=#{target.inspect} time=#{orm.time}>" rescue TargetMissingError "#<Updater::Update target=<missing> time=#{orm.time}>" end |
#lock(worker) ⇒ Object
atempt to lock this record for the worker
51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/updater/update_dm.rb', line 51 def lock(worker) return true if locked? && locked_by == worker.name #all this to make sure the check and the lock are simultanious: cnt = repository.update({properties[:lock_name]=>worker.name},self.class.all(:id=>self.id,:lock_name=>nil)) if 0 != cnt @lock_name = worker.name true else worker.say( "Worker #{worker.name} Failed to aquire lock on job #{id}" ) false end end |
#locked? ⇒ Boolean
64 65 66 |
# File 'lib/updater/update_dm.rb', line 64 def locked? not @lock_name.nil? end |
#locked_by ⇒ Object
68 69 70 |
# File 'lib/updater/update_dm.rb', line 68 def locked_by @lock_name end |
#name ⇒ Object
Jobs may be named to make them easier to find
73 74 75 |
# File 'lib/updater/update.rb', line 73 def name @orm.name end |
#name=(n) ⇒ Object
Jobs may be named to make them easier to find
68 69 70 |
# File 'lib/updater/update.rb', line 68 def name=(n) @orm.name=n end |
#persistant? ⇒ Boolean
If this is true, the job will NOT be removed after it is run. This is usually true for chained Jobs.
87 88 89 |
# File 'lib/updater/update.rb', line 87 def persistant? @orm.persistant end |
#run(job = nil) ⇒ Object
Send the method with args to the target.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/updater/update.rb', line 21 def run(job=nil) ret = true #put return in scope begin t = target final_args = sub_args(job,@orm.method_args) t.send(@orm.method.to_sym,*final_args) rescue => e @error = e run_chain :failure ret = false ensure run_chain :success if ret run_chain :ensure begin @orm.destroy unless @orm.persistant rescue StandardError => e raise e unless e.class.to_s =~ /Connection/ sleep 0.1 retry end end ret end |
#run_with_lock(worker) ⇒ Object
Like run but first aquires a lock for the worker. Will return the result of run or nil if the record could not be locked
74 75 76 |
# File 'lib/updater/update_dm.rb', line 74 def run_with_lock(worker) run if lock(worker) end |
#sub_args(job, a) ⇒ Object
Use and Purpose
Takes a previous job and the original array of arguments form the data store. It replaced three special values with meta information from Updater. This is done to allow chained jobs to respond to specific conditions in the originating job.
Substitutions
The following strings are replaced with meta information from the calling job as described below:
-
‘__job__’: replaced with the instance of Updater::Update that chained into this job. If the job failed (that is raised and error while being run), this instance will contain an error field with that error.
-
‘__params__’: this is an optional field of a chain instance. It allows the chaining job to set specific options for the chained job to use. For example a chained job that reschedules the the original job might take an option defining how frequently the job is rescheduled. This would be passed in the params field. (See example in Updater::Chained – Pending!)
-
‘__self__’: this is simply set to the instance of Updater::Update that is calling the method. This might be useful for both chained and original jobs that find a need to manipulate of inspect that job that called them. Without this field, it would be impossible for a method to consistantly determin wether it had been run from a background job or invoked direclty by the app.
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/updater/update.rb', line 123 def sub_args(job,a) a.map do |e| begin case e.to_s when '__job__' job when '__params__' @params when '__self__' self else e end # For the unfortunate case where e doesn't handle to_s nicely. # On the other hand I dare someone to find something that can be marshaled, # but doesn't do #to_s. rescue NoMethodError=>err raise err unless err. =~ /\`to_s\'/ e end #begin end# map end |
#target ⇒ Object
Returns the Class or instance that will recieve the method call. See Updater.at
for information about how a target is derived.
55 56 57 58 59 |
# File 'lib/updater/update.rb', line 55 def target target = @orm.finder.nil? ? @orm.target : @orm.target.send(@orm.finder,@orm.finder_args) raise TargetMissingError, "Target missing --Class:'#{@orm.target}' Finder:'#{@orm.finder}', Args:'#{@orm.finder_args.inspect}'" unless target target end |