Class: Updater::Update

Inherits:
Object
  • Object
show all
Includes:
DataMapper::Resource
Defined in:
lib/updater/update.rb,
lib/updater/update_dm.rb

Overview

the basic class that drives updater

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(orm_inst) ⇒ Update

orm_inst must be set to an instacne of the class Update.orm

Raises:

  • (ArgumentError)


62
63
64
65
# File 'lib/updater/update.rb', line 62

def initialize(orm_inst)
  raise ArgumentError if orm_inst.nil? || !orm_inst.kind_of?(orm)
  @orm = orm_inst
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method, *args) ⇒ Object

see if this method was intended for the underlying ORM layer.



46
47
48
# File 'lib/updater/update.rb', line 46

def method_missing(method, *args)
  @orm.send(method,*args)
end

Class Attribute Details

.finder_idObject

This is the application level default method to call on an instance type target. It should return a value to be passed to the #finder_method (above) inorder to retrieve the instance from the datastore. (eg. id) In most circumstances the ORM layer defines an appropriate default and this does not need to be explcitly set.

MongoDB is one significant exception to this rule. The Updater Mongo ORM layer uses the 10gen MongoDB dirver directly without an ORM such as Mongoid or Mongo_Mapper. If the application uses ond of thes ORMs #finder_method and #finder_id should be explicitly set.



186
187
188
# File 'lib/updater/update.rb', line 186

def finder_id
  @finder_id
end

.finder_methodObject

This is the application level default method to call on a class in order to find/create a target instance. (e.g find, get, find_one, etc…). In most circumstances the ORM layer defines an appropriate default and this does not need to be explcitly set.

MongoDB is one significant exception to this rule. The Updater Mongo ORM layer uses the 10gen MongoDB dirver directly without an ORM such as Mongoid or Mongo_Mapper. If the application uses ond of thes ORMs #finder_method and #finder_id should be explicitly set.



176
177
178
# File 'lib/updater/update.rb', line 176

def finder_method
  @finder_method
end

.loggerObject

Returns the logger instance. If it has not been set, a new Logger will be created pointing to STDOUT



203
204
205
# File 'lib/updater/update.rb', line 203

def logger
  @logger ||= Logger.new(STDOUT)
end

.ormObject

This attribute must be set to some ORM that will persist the data. The value is normally set using one of the methods in Updater::Setup.



167
168
169
# File 'lib/updater/update.rb', line 167

def orm
  @orm
end

.socketObject

This is an open IO socket that will be writen to when a job is scheduled. If it is unset then @pid is signaled instead.



197
198
199
# File 'lib/updater/update.rb', line 197

def socket
  @socket
end

Instance Attribute Details

#errorObject (readonly)

Contains the Error class after an error is caught in run. Not stored to the database.



8
9
10
# File 'lib/updater/update.rb', line 8

def error
  @error
end

#ormObject (readonly)

Contains the underlying ORM instance (eg. ORM::Datamapper or ORM Mongo)



11
12
13
# File 'lib/updater/update.rb', line 11

def orm
  @orm
end

#paramsObject

In order to reduce the proliferation of chained jobs in the queue, jobs chain request are allowed a params value that will pass specific values to a chained method. When a chained instance is created, the job processor will set this value. It will then be sent to the target method in plance of ‘__param__’. See #sub_args



18
19
20
# File 'lib/updater/update.rb', line 18

def params
  @params
end

Class Method Details

.at(time, target, method, args = [], options = {}) ⇒ Object

Request that the target be sent the method with args at the given time.

Parameters

time <Integer | Object responding to to_i>, by default the number of seconds sence the epoch.

What ‘time’ references can be set by sending the a substitute class to the time= method.

target <Class | instance> . If target is a class then ‘method’ will be sent to that class (unless the finder option is used. Otherwise, the target will be assumed to be the result of (target.class).get(target.id). The finder method (:get by default) and the finder_args (target.id by default) can be set in the options. A DataMapper instance passed as the target will “just work.” Any object can be found in this mannor is known as a ‘conforming instance’.

method <Symbol>. The method that will be sent to the calculated target.

args <Array> a list of arguments to be sent to with the method call. Note: ‘args’ must be seirialiable with Marshal.dump. Defaults to []

options <Hash> Addational options that will be used to configure the request. see Options section below.

Options

:finder <Symbol> This method will be sent to the stored target class (either target or target.class) inorder to extract the instance on which to preform the request. By default :get is used. For example to use on an ActiveRecord class

:finder=>:find

:finder_args <Array> | <Object>. This is passed to the finder function. By default it is target.id. Note that by setting :finder_args you will force Updater to calculate in instance as the computed target even if you pass a Class as the target.

:name <String> A string sent by the requesting class to identify the request. ‘name’ must be unique for a given computed target. Names cannot be used effectivally when a Class has non- conforming instances as there is no way predict the results of a finder call. ‘name’ can be used in conjunction with the for method to manipulate requests effecting an object or class after they are set. See for for examples

:failure <Updater> an other request to be run if this request raises an error. Usually the failure request will be created with the chane method.

Examples

Updater.at(Chronic.parse('tomorrow'),Foo,:bar,[]) # will run Foo.bar() tomorrow at midnight

f = Foo.create
u = Updater.at(Chronic.parse('2 hours form now'),f,:bar,[]) # will run Foo.get(f.id).bar in 2 hours


292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/updater/update.rb', line 292

def at(t,target,method = nil,args=[],options={})
  hash = Hash.new
  hash[:time] = t.to_i unless t.nil?
  
  hash[:target],hash[:finder],hash[:finder_args] = target_for(target, options)
  
  hash[:method] = method || :perform
  hash[:method_args] = args
  
  [:name,:failure,:success,:ensure].each do |opt|
    hash[opt] = options[opt] if options[opt]
  end
  
  hash[:persistant] = options[:persistant] || t.nil? ? true : false
  
  schedule(hash)
end

.chain(*args) ⇒ Object

like at but without a time to run. This is used to create requests that run in responce to the failure of other requests. See at for details



342
343
344
# File 'lib/updater/update.rb', line 342

def chain(*args)
  at(nil,*args)
end

.clear_allObject

Remove all scheduled jobs. Mostly intended for testing, but may also be useful in cases of crashes or system corruption. removes all pending jobs.



404
405
406
# File 'lib/updater/update.rb', line 404

def clear_all
  @orm.clear_all
end

.clear_locks(worker) ⇒ Object

Ensure that a worker no longer holds any locks.



222
# File 'lib/updater/update.rb', line 222

def clear_locks(worker); @orm.clear_locks(worker); end

.currentObject

A filter for all requests that are ready to run, that is they requested to be run before or at time.now



381
382
383
# File 'lib/updater/update.rb', line 381

def current
  @orm.current
end

.delayedObject

A filter for all requests that are not yet ready to run, that is time is after time.now



391
392
393
# File 'lib/updater/update.rb', line 391

def delayed
  @orm.delayed
end

.for(target, name = nil) ⇒ Object

Retrieves all updates for a conforming target possibly limiting the results to the named request.

Parameters

target <Class | Object> a class or conforming object that postentially is the calculated target of a request.

name(optional) <String> If a name is sent, the first request with fot this target with this name will be returned.

Returns

<Array> unless name is given then only a single [Updater] instance.



360
361
362
363
364
# File 'lib/updater/update.rb', line 360

def for(target,name=nil)
  target,finder,args = target_for(target)
  ret = @orm.for(target,finder,args,name).map {|i| new(i)}
  name ? ret.first : ret
end

.future(n) ⇒ Object

how many jobs will happen in the next n seconds



397
398
399
400
# File 'lib/updater/update.rb', line 397

def future(start,finish = nil)
  start, finish = [0, start] unless finish 
  @orm.future(start,finish)
end

.immidiate(*args) ⇒ Object

like at but with time as time.now. Generally this will be used to run a long running operation in asyncronously in a differen process. See at for details



336
337
338
# File 'lib/updater/update.rb', line 336

def immidiate(*args)
  at(time.now,*args)
end

.in(t, *args) ⇒ Object

Run this job in ‘time’ seconds from now. See at for details on expected args.



311
312
313
# File 'lib/updater/update.rb', line 311

def in(t,*args)
  at(time.now+t,*args)
end

.loadObject

The number of jobs currently backloged in the system



386
387
388
# File 'lib/updater/update.rb', line 386

def load
  @orm.current_load
end

.pidObject

The PID of the worker process



428
429
430
# File 'lib/updater/update.rb', line 428

def pid
  @pid
end

.pid=(p) ⇒ Object

Sets the process id of the worker process if known. If this is set then an attempt will be made to signal the worker any time a new update is made.

If pid is not set, or is set to nil then the scheduleing program is responcible for waking-up a potentially sleeping worker process in another way.



417
418
419
420
421
422
423
424
425
# File 'lib/updater/update.rb', line 417

def pid=(p)
  return @pid = nil unless p #tricky assignment in return
  @pid = Integer("#{p}") #safety check that prevents a curupted PID file from crashing the system
  Process::kill 0, @pid #check that the process exists
  @pid
rescue Errno::ESRCH, ArgumentError
  @pid = nil
  raise ArgumentError, "PID was invalid"
end

.queue_timeObject



263
264
265
266
267
268
# File 'lib/updater/update_dm.rb', line 263

def queue_time
  nxt = self.first(:time.not=>nil,:lock_name=>nil, :order=>[:time.asc])
  return nil unless nxt
  return 0 if nxt.time <= time.now.to_i
  return nxt.time - time.now.to_i
end

.reschedule(update, hash = {}) ⇒ Object

Create a new job having the same charistics as the old, except that ‘hash’ will override the original.



327
328
329
330
331
332
# File 'lib/updater/update.rb', line 327

def reschedule(update, hash={})
  new_job = update.orm.dup
  new_job.update_attributes(hash)
  new_job.save
  new(new_job)
end

.schedule(hash) ⇒ Object

Advanced: This method allows values to be passed directly to the ORM layer’s create method. use at and friends for everyday use cases.



317
318
319
320
321
322
323
324
# File 'lib/updater/update.rb', line 317

def schedule(hash)
  r = new(@orm.create(hash))
  signal_worker
  r
rescue NoMethodError
  raise ArgumentError, "ORM not initialized!" if @orm.nil?
  raise
end

.timeObject

The time class used by Updater. See time=



367
368
369
# File 'lib/updater/update.rb', line 367

def time
  @time ||= Time
end

.time=(klass) ⇒ Object

By default Updater will use the system time (Time class) to get the current time. The application that Updater was developed for used a game clock that could be paused or restarted. This method allows us to substitute a custom class for Time. This class must respond with in interger or Time to the #now method.



375
376
377
# File 'lib/updater/update.rb', line 375

def time=(klass)
  @time = klass
end

.work_off(worker) ⇒ Object

Gets a single job form the queue, locks and runs it.



210
211
212
213
214
215
216
217
218
219
# File 'lib/updater/update.rb', line 210

def work_off(worker)
  inst = @orm.lock_next(worker)
  if inst
    worker.logger.debug "  running job #{inst.id}" 
    new(inst).run
  end
  @orm.queue_time
ensure
  clear_locks(worker)
end

.worker_set(limit = 5, options = {}) ⇒ Object

This returns a set of update requests. The first parameter is the maximum number to return (get a few other workers may be in compitition) The second optional parameter is a list of options to be past to DataMapper.



234
235
236
237
238
# File 'lib/updater/update_dm.rb', line 234

def worker_set(limit = 5, options={})
  #TODO: add priority to this.
  options = {:lock_name=>nil,:limit=>limit, :order=>[:time.asc]}.merge(options)
  current.all(options)
end

Instance Method Details

#==(other) ⇒ Object



82
83
84
# File 'lib/updater/update.rb', line 82

def ==(other)
  id = other.id
end

#idObject

This is the appropriate value to use for a chanable field value



78
79
80
# File 'lib/updater/update.rb', line 78

def id
  @orm.id
end

#inspectObject

:nodoc:



293
294
295
296
297
# File 'lib/updater/update_dm.rb', line 293

def inspect
  "#<Updater::Update target=#{target.inspect} time=#{orm.time}>"
rescue TargetMissingError
  "#<Updater::Update target=<missing> time=#{orm.time}>"
end

#lock(worker) ⇒ Object

atempt to lock this record for the worker



51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/updater/update_dm.rb', line 51

def lock(worker)
  return true if locked? && locked_by == worker.name
  #all this to make sure the check and the lock are simultanious:
  cnt = repository.update({properties[:lock_name]=>worker.name},self.class.all(:id=>self.id,:lock_name=>nil))
  if 0 != cnt
    @lock_name = worker.name
    true
  else
    worker.say( "Worker #{worker.name} Failed to aquire lock on job #{id}" )
    false
  end
end

#locked?Boolean

Returns:

  • (Boolean)


64
65
66
# File 'lib/updater/update_dm.rb', line 64

def locked?
  not @lock_name.nil?
end

#locked_byObject



68
69
70
# File 'lib/updater/update_dm.rb', line 68

def locked_by
  @lock_name
end

#nameObject

Jobs may be named to make them easier to find



73
74
75
# File 'lib/updater/update.rb', line 73

def name
  @orm.name
end

#name=(n) ⇒ Object

Jobs may be named to make them easier to find



68
69
70
# File 'lib/updater/update.rb', line 68

def name=(n)
  @orm.name=n
end

#persistant?Boolean

If this is true, the job will NOT be removed after it is run. This is usually true for chained Jobs.

Returns:

  • (Boolean)


87
88
89
# File 'lib/updater/update.rb', line 87

def persistant?
  @orm.persistant
end

#run(job = nil) ⇒ Object

Send the method with args to the target.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/updater/update.rb', line 21

def run(job=nil)
  ret = true #put return in scope
  begin
    t = target 
    final_args = sub_args(job,@orm.method_args)
    t.send(@orm.method.to_sym,*final_args)
  rescue => e
    @error = e
    run_chain :failure
    ret = false
  ensure
    run_chain :success if ret
    run_chain :ensure
    begin
      @orm.destroy unless @orm.persistant
    rescue StandardError => e
      raise e unless e.class.to_s =~ /Connection/
      sleep 0.1
      retry
    end
  end
  ret
end

#run_with_lock(worker) ⇒ Object

Like run but first aquires a lock for the worker. Will return the result of run or nil if the record could not be locked



74
75
76
# File 'lib/updater/update_dm.rb', line 74

def run_with_lock(worker)
  run if lock(worker)
end

#sub_args(job, a) ⇒ Object

Use and Purpose

Takes a previous job and the original array of arguments form the data store. It replaced three special values with meta information from Updater. This is done to allow chained jobs to respond to specific conditions in the originating job.

Substitutions

The following strings are replaced with meta information from the calling job as described below:

  • ‘__job__’: replaced with the instance of Updater::Update that chained into this job. If the job failed (that is raised and error while being run), this instance will contain an error field with that error.

  • ‘__params__’: this is an optional field of a chain instance. It allows the chaining job to set specific options for the chained job to use. For example a chained job that reschedules the the original job might take an option defining how frequently the job is rescheduled. This would be passed in the params field. (See example in Updater::Chained – Pending!)

  • ‘__self__’: this is simply set to the instance of Updater::Update that is calling the method. This might be useful for both chained and original jobs that find a need to manipulate of inspect that job that called them. Without this field, it would be impossible for a method to consistantly determin wether it had been run from a background job or invoked direclty by the app.



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/updater/update.rb', line 123

def sub_args(job,a)
  a.map do |e| 
    begin
      case e.to_s
        when '__job__'
          job
        when '__params__'
          @params
        when '__self__'
          self
        else
          e
      end
    # For the unfortunate case where e doesn't handle to_s nicely.
    # On the other hand I dare someone to find something that can be marshaled,
    # but doesn't do #to_s.
    rescue NoMethodError=>err
      raise err unless err.message =~ /\`to_s\'/
      e
    end #begin
  end# map
end

#targetObject

Returns the Class or instance that will recieve the method call. See Updater.at for information about how a target is derived.



55
56
57
58
59
# File 'lib/updater/update.rb', line 55

def target
  target = @orm.finder.nil? ? @orm.target : @orm.target.send(@orm.finder,@orm.finder_args)
  raise TargetMissingError, "Target missing --Class:'#{@orm.target}' Finder:'#{@orm.finder}', Args:'#{@orm.finder_args.inspect}'" unless target
  target
end