Module: ResqueScheduler

Included in:
Resque
Defined in:
lib/resque_scheduler/server.rb,
lib/resque_scheduler.rb,
lib/resque_scheduler/version.rb,
lib/resque_scheduler/search_delayed.rb

Overview

Extend Resque::Server to add tabs

Defined Under Namespace

Modules: Server

Constant Summary collapse

Version =
'2.0.3'

Instance Method Summary collapse

Instance Method Details

#count_all_scheduled_jobsObject



216
217
218
219
220
221
222
# File 'lib/resque_scheduler.rb', line 216

def count_all_scheduled_jobs
  total_jobs = 0
  delayed_queue.find.each do |doc|
    total_jobs += (doc['items'] || []).size
  end
  total_jobs
end

#delayed_push(timestamp, item) ⇒ Object

Used internally to stuff the item into the schedule sorted list. timestamp can be either in seconds or a datetime object Insertion if O(log(n)). Returns true if it’s the first job to be scheduled at that time, else false

Returns:

  • the number of items for this timestamp



133
134
135
136
137
138
139
140
141
142
# File 'lib/resque_scheduler.rb', line 133

def delayed_push(timestamp, item)
  # Add this item to the list for this timestamp
  doc = delayed_queue.find_and_modify(
    :query => {'_id' => timestamp.to_i},
    :update => {'$push' => {:items => item}},
    :upsert => true,
    :new => true
  )
  doc['items'].size
end

#delayed_queueObject



22
23
24
25
# File 'lib/resque_scheduler.rb', line 22

def delayed_queue
  self.mongo ||= ENV['MONGO'] || 'localhost:27017'
  @delayed_queue ||= @db.collection('delayed_queue')
end

#delayed_queue_peek(start, count) ⇒ Object

Returns an array of timestamps based on start and count



145
146
147
# File 'lib/resque_scheduler.rb', line 145

def delayed_queue_peek(start, count)
  delayed_queue.find({}, :skip => start, :limit => count, :fields => '_id', :sort => ['_id', 1]).map {|d| d['_id']}
end

#delayed_queue_schedule_sizeObject

Returns the size of the delayed queue schedule



150
151
152
# File 'lib/resque_scheduler.rb', line 150

def delayed_queue_schedule_size
  delayed_queue.count
end

#delayed_timestamp_peek(timestamp, start, count) ⇒ Object

Returns an array of delayed items for the given timestamp



161
162
163
164
165
166
167
# File 'lib/resque_scheduler.rb', line 161

def delayed_timestamp_peek(timestamp, start, count)
  doc = delayed_queue.find_one(
    {'_id' => timestamp.to_i},
    :fields => {'items' => {'$slice' => [start, count]}}
  )
  doc ? doc['items'] || [] : []
end

#delayed_timestamp_size(timestamp) ⇒ Object

Returns the number of jobs for a given timestamp in the delayed queue schedule



155
156
157
158
# File 'lib/resque_scheduler.rb', line 155

def delayed_timestamp_size(timestamp)
  document = delayed_queue.find_one('_id' => timestamp.to_i)
  document ? (document['items'] || []).size : 0
end

#enqueue_at(timestamp, klass, *args) ⇒ Object

This method is nearly identical to enqueue only it also takes a timestamp which will be used to schedule the job for queueing. Until timestamp is in the past, the job will sit in the schedule list.

Returns:

  • the number of items for this timestamp



117
118
119
# File 'lib/resque_scheduler.rb', line 117

def enqueue_at(timestamp, klass, *args)
  delayed_push(timestamp, job_to_hash(klass, args))
end

#enqueue_in(number_of_seconds_from_now, klass, *args) ⇒ Object

Identical to enqueue_at but takes number_of_seconds_from_now instead of a timestamp.

Returns:

  • the number of items for this timestamp



124
125
126
# File 'lib/resque_scheduler.rb', line 124

def enqueue_in(number_of_seconds_from_now, klass, *args)
  enqueue_at(Time.now + number_of_seconds_from_now, klass, *args)
end

#get_schedule(name) ⇒ Object

retrieve the schedule configuration for the given name



90
91
92
93
94
# File 'lib/resque_scheduler.rb', line 90

def get_schedule(name)
  schedule = schedules.find_one('_id' => name)
  schedule.delete('_id') if schedule
  schedule
end

#get_schedulesObject

gets the schedule as it exists in mongo



67
68
69
70
71
72
73
74
75
76
77
# File 'lib/resque_scheduler.rb', line 67

def get_schedules
  if schedules.count > 0
    h = {}
    schedules.find.each do |a|
      h[a.delete('_id')] = a
    end
    h
  else
    nil
  end
end

#next_delayed_timestamp(at_time = nil) ⇒ Object

Returns the next delayed queue timestamp (don’t call directly)



171
172
173
174
175
176
177
# File 'lib/resque_scheduler.rb', line 171

def next_delayed_timestamp(at_time=nil)
  doc = delayed_queue.find_one(
    {'_id' => {'$lte' => (at_time || Time.now).to_i}},
    :sort => ['_id', Mongo::ASCENDING]
  )
  doc ? doc['_id'] : nil
end

#next_item_for_timestamp(timestamp) ⇒ Object

Returns the next item to be processed for a given timestamp, nil if done. (don’t call directly) timestamp can either be in seconds or a datetime



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/resque_scheduler.rb', line 182

def next_item_for_timestamp(timestamp)
  # Returns the array of items before it was shifted
  doc = delayed_queue.find_and_modify(
    :query => {'_id' => timestamp.to_i},
    :update => {'$pop' => {'items' => -1}} # -1 means shift
  )
  item = doc['items'].first
  
  # If the list is empty, remove it.
  clean_up_timestamp(timestamp)
  
  item
rescue Mongo::OperationFailure
  # Database command 'findandmodify' failed: {"errmsg"=>"No matching object found", "ok"=>0.0}
  nil
end

#pop_schedules_changedObject



102
103
104
105
106
107
108
109
110
# File 'lib/resque_scheduler.rb', line 102

def pop_schedules_changed
  while doc = schedules_changed.find_and_modify(:remove => true)
    yield doc['_id']
  end
rescue Mongo::OperationFailure
  # "Database command 'findandmodify' failed: {"errmsg"=>"No matching object found", "ok"=>0.0}"
  # Sadly, the mongo driver raises (with a global exception class) instead of returning nil when
  # the collection is empty.
end

#reload_schedule!Object

reloads the schedule from mongo



62
63
64
# File 'lib/resque_scheduler.rb', line 62

def reload_schedule!
  @schedule = get_schedules
end

#remove_delayed(klass, *args) ⇒ Object

given an encoded item, remove it from the delayed_queue does not clean like next_item_for_timestamp TODO ? unlike resque-scheduler, it does not return the number of removed items, can’t use find_and_modify because it only updates one item.



208
209
210
211
212
213
214
# File 'lib/resque_scheduler.rb', line 208

def remove_delayed(klass, *args)
  delayed_queue.update(
    {},
    {'$pull' => {'items' => job_to_hash(klass, args)}},
    :multi => true
  )
end

#remove_schedule(name) ⇒ Object

remove a given schedule by name



97
98
99
100
# File 'lib/resque_scheduler.rb', line 97

def remove_schedule(name)
  schedules.remove('_id' => name)
  schedules_changed.insert('_id' => name)
end

#reset_delayed_queueObject

Clears all jobs created with enqueue_at or enqueue_in



200
201
202
# File 'lib/resque_scheduler.rb', line 200

def reset_delayed_queue
  delayed_queue.remove
end

#scheduleObject

Returns the schedule hash



57
58
59
# File 'lib/resque_scheduler.rb', line 57

def schedule
  @schedule ||= {}
end

#schedule=(schedule_hash) ⇒ Object

Accepts a new schedule configuration of the form:

{some_name => {"cron" => "5/* * * *",
               "class" => DoSomeWork,
               "args" => "work on this string",
               "description" => "this thing works it"s butter off"},
 ...}

:name can be anything and is used only to describe the scheduled job :cron can be any cron scheduling string :job can be any resque job class :every can be used in lieu of :cron. see rufus-scheduler’s ‘every’ usage for

valid syntax. If :cron is present it will take precedence over :every.

:class must be a resque worker class :args can be any yaml which will be converted to a ruby literal and passed

in a params. (optional)

:rails_envs is the list of envs where the job gets loaded. Envs are comma separated (optional) :description is just that, a description of the job (optional). If params is

an array, each element in the array is passed as a separate param,
otherwise params is passed in as the only parameter to perform.


47
48
49
50
51
52
53
54
# File 'lib/resque_scheduler.rb', line 47

def schedule=(schedule_hash)
  if Resque::Scheduler.dynamic
    schedule_hash.each do |name, job_spec|
      set_schedule(name, job_spec)
    end
  end
  @schedule = schedule_hash
end

#schedulesObject



12
13
14
15
# File 'lib/resque_scheduler.rb', line 12

def schedules
  self.mongo ||= ENV['MONGO'] || 'localhost:27017'
  @schedules ||= @db.collection('schedules')
end

#schedules_changedObject



17
18
19
20
# File 'lib/resque_scheduler.rb', line 17

def schedules_changed
  self.mongo ||= ENV['MONGO'] || 'localhost:27017'
  @schedules_changed ||= @db.collection('schedules_changed')
end

#search_delayed(query, start = 0, count = 1) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/resque_scheduler/search_delayed.rb', line 7

def search_delayed(query, start = 0, count = 1)
  if query.nil? || query.empty?
    @@search_results = []
    return []
  end

  start, count = [start, count].map { |n| Integer(n) }
  set_results = Set.new

  # For each search term, retrieve the failed jobs that contain at least one relevant field matching the regexp defined by that search term
  query.split.each do |term|
    
    partial_results = []
    self.delayed_queue.find().each do |row|
      row['items'].each do |job|
        if job['class'] =~ /#{term}/i || job['queue'] =~ /#{term}/i
          partial_results << row['_id']
        else
          job['args'].each do |arg|
            arg.each do |key, value|
              if key =~ /#{term}/i || value =~ /#{term}/i
                partial_results << row['_id']
              end
            end
          end
        end
      end
    end

    # If the set was empty, merge the first results, else intersect it with the current results
    if set_results.empty?
      set_results.merge(partial_results)
    else
      set_results = set_results & partial_results
    end
  end
    
  # search_res will be an array containing 'count' values, starting with 'start', sorted in descending order
  @@search_results = set_results.to_a || []
  search_results = set_results.to_a[start, count]
  search_results || []
end

#search_delayed_countObject



3
4
5
# File 'lib/resque_scheduler/search_delayed.rb', line 3

def search_delayed_count
  @@search_results.count
end

#set_schedule(name, config) ⇒ Object

create or update a schedule with the provided name and configuration



80
81
82
83
84
85
86
87
# File 'lib/resque_scheduler.rb', line 80

def set_schedule(name, config)
  existing_config = get_schedule(name)
  unless existing_config && existing_config == config
    schedules.insert(config.merge('_id' => name))
    schedules_changed.insert('_id' => name)
  end
  config
end