Class: Updater::ORM::DataMapper

Inherits:
Object
  • Object
show all
Includes:
DataMapper::Resource
Defined in:
lib/updater/orm/datamapper.rb

Constant Summary collapse

FINDER =
:get
ID =
:id

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.after_forkObject



189
190
191
# File 'lib/updater/orm/datamapper.rb', line 189

def after_fork
  
end

.before_forkObject

For pooled connections it is necessary to empty the pool of the parents connections so that they do not comtiminate the child pool. Note that while Datamapper is thread safe, it is not safe accross a process fork.



182
183
184
185
186
187
# File 'lib/updater/orm/datamapper.rb', line 182

def before_fork
  return unless (defined? ::DataObjects::Pooling)
  return if ::DataMapper.repository.adapter.to_s =~ /Sqlite3Adapter/
  ::DataMapper.logger.debug "+-+-+-+-+ Cleaning up connection pool (#{::DataObjects::Pooling.pools.length}) +-+-+-+-+"
  ::DataObjects::Pooling.pools.each {|p| p.dispose} 
end

.clear_allObject



149
150
151
152
# File 'lib/updater/orm/datamapper.rb', line 149

def clear_all
  all.destroy!
  DMChained.all.destroy!
end

.clear_locks(worker) ⇒ Object



145
146
147
# File 'lib/updater/orm/datamapper.rb', line 145

def clear_locks(worker)
  all(:lock_name=>worker.name).update(:lock_name=>nil)
end

.currentObject



104
105
106
# File 'lib/updater/orm/datamapper.rb', line 104

def current
  all(:time.lte=>tnow, :lock_name=>nil)
end

.current_loadObject



108
# File 'lib/updater/orm/datamapper.rb', line 108

def current_load;current.count;end

.delayedObject



110
111
112
# File 'lib/updater/orm/datamapper.rb', line 110

def delayed
  all(:time.gt=>tnow).count
end

.for(mytarget, myfinder, myfinder_args, myname = nil) ⇒ Object



154
155
156
157
158
159
160
161
162
# File 'lib/updater/orm/datamapper.rb', line 154

def for(mytarget, myfinder, myfinder_args, myname=nil)
  search = all(
      :target=>mytarget,
      :finder=>myfinder,
      :finder_args=>myfinder_args.to_yaml, 
      :lock_name=>nil
    )
  myname ? search.all(:name=>myname ) : search
end

.future(start, finish) ⇒ Object



114
115
116
# File 'lib/updater/orm/datamapper.rb', line 114

def future(start, finish)
  all(:time.gt=>start+tnow,:time.lt=>finish+tnow).count
end

.lock_next(worker) ⇒ Object

Returns the Locked Job or nil if no jobs were availible.



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/updater/orm/datamapper.rb', line 126

def lock_next(worker)
  updates = worker_set
  unless updates.empty?
    #concept copied form delayed_job.  If there are a number of 
    #different processes working on the queue, the niave approch
    #would result in every instance trying to lock the same record.
    #by shuffleing our results we greatly reduce the chances that
    #multilpe workers try to lock the same process
    updates = updates.to_a.sort_by{rand()}
    updates.each do |u|
      return u if u.lock(worker)
    end
    return nil
  end
rescue DataObjects::ConnectionError
  sleep 0.1
  retry
end

.loggerObject



172
173
174
# File 'lib/updater/orm/datamapper.rb', line 172

def logger
  ::DataMapper.logger
end

.logger=(input) ⇒ Object



176
177
178
# File 'lib/updater/orm/datamapper.rb', line 176

def logger=(input)
  ::DataMapper.logger = input
end

.queue_timeObject



118
119
120
121
122
123
# File 'lib/updater/orm/datamapper.rb', line 118

def queue_time
  nxt = self.first(:time.not=>nil,:lock_name=>nil, :order=>[:time.asc])
  return nil unless nxt
  return 0 if nxt.time <= tnow
  return nxt.time - tnow
end

.setup(options) ⇒ Object

For the server only, setup the connection to the database



165
166
167
168
169
170
# File 'lib/updater/orm/datamapper.rb', line 165

def setup(options)
  ::DataMapper.logger = options.delete(:logger)
  auto_migrate = options.delete(:auto_migrate)
  ::DataMapper.setup(:default,options)
  ::DataMapper.auto_migrate! if auto_migrate
end

Instance Method Details

#lock(worker) ⇒ Object

attempt to lock this record for the worker



40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/updater/orm/datamapper.rb', line 40

def lock(worker)
  return true if locked? && locked_by == worker.name
  #all this to make sure the check and the lock are simultanious:
  cnt = repository.update({properties[:lock_name]=>worker.name},self.class.all(:id=>self.id,:lock_name=>nil))
  if 0 != cnt
    @lock_name = worker.name
    true
  else
    worker.say( "Worker #{worker.name} Failed to aquire lock on job #{id}" )
    false
  end
end

#locked?Boolean

Useful, but not in API

Returns:

  • (Boolean)


94
95
96
# File 'lib/updater/orm/datamapper.rb', line 94

def locked?
  not @lock_name.nil?
end

#locked_byObject

Useful, but not in API



99
100
101
# File 'lib/updater/orm/datamapper.rb', line 99

def locked_by
  @lock_name
end

#methodObject



35
36
37
# File 'lib/updater/orm/datamapper.rb', line 35

def method
  self[:method]
end