Module: ResqueScheduler

Included in:
Resque
Defined in:
lib/resque_scheduler/server.rb,
lib/resque_scheduler.rb,
lib/resque_scheduler/version.rb

Overview

Extend Resque::Server to add tabs

Defined Under Namespace

Modules: Server

Constant Summary collapse

Version =
'1.9.6'

Instance Method Summary collapse

Instance Method Details

#count_all_scheduled_jobsObject



130
131
132
133
134
135
136
# File 'lib/resque_scheduler.rb', line 130

def count_all_scheduled_jobs
  total_jobs = 0 
  Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |timestamp|
    total_jobs += redis.llen("delayed:#{timestamp}").to_i
  end 
  total_jobs
end

#delayed_push(timestamp, item) ⇒ Object

Used internally to stuff the item into the schedule sorted list. timestamp can be either in seconds or a datetime object Insertion if O(log(n)). Returns true if it’s the first job to be scheduled at that time, else false



55
56
57
58
59
60
61
62
63
# File 'lib/resque_scheduler.rb', line 55

def delayed_push(timestamp, item)
  # First add this item to the list for this timestamp
  redis.rpush("delayed:#{timestamp.to_i}", encode(item))

  # Now, add this timestamp to the zsets.  The score and the value are
  # the same since we'll be querying by timestamp, and we don't have
  # anything else to store.
  redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i
end

#delayed_queue_peek(start, count) ⇒ Object

Returns an array of timestamps based on start and count



66
67
68
# File 'lib/resque_scheduler.rb', line 66

def delayed_queue_peek(start, count)
  Array(redis.zrange(:delayed_queue_schedule, start, start+count)).collect{|x| x.to_i}
end

#delayed_queue_schedule_sizeObject

Returns the size of the delayed queue schedule



71
72
73
# File 'lib/resque_scheduler.rb', line 71

def delayed_queue_schedule_size
  redis.zcard :delayed_queue_schedule
end

#delayed_timestamp_peek(timestamp, start, count) ⇒ Object

Returns an array of delayed items for the given timestamp



81
82
83
84
85
86
87
88
# File 'lib/resque_scheduler.rb', line 81

def delayed_timestamp_peek(timestamp, start, count)
  if 1 == count
    r = list_range "delayed:#{timestamp.to_i}", start, count
    r.nil? ? [] : [r]
  else
    list_range "delayed:#{timestamp.to_i}", start, count
  end
end

#delayed_timestamp_size(timestamp) ⇒ Object

Returns the number of jobs for a given timestamp in the delayed queue schedule



76
77
78
# File 'lib/resque_scheduler.rb', line 76

def delayed_timestamp_size(timestamp)
  redis.llen("delayed:#{timestamp.to_i}").to_i
end

#enqueue_at(timestamp, klass, *args) ⇒ Object

This method is nearly identical to enqueue only it also takes a timestamp which will be used to schedule the job for queueing. Until timestamp is in the past, the job will sit in the schedule list.



41
42
43
# File 'lib/resque_scheduler.rb', line 41

def enqueue_at(timestamp, klass, *args)
  delayed_push(timestamp, job_to_hash(klass, args))
end

#enqueue_in(number_of_seconds_from_now, klass, *args) ⇒ Object

Identical to enqueue_at but takes number_of_seconds_from_now instead of a timestamp.



47
48
49
# File 'lib/resque_scheduler.rb', line 47

def enqueue_in(number_of_seconds_from_now, klass, *args)
  enqueue_at(Time.now + number_of_seconds_from_now, klass, *args)
end

#next_delayed_timestampObject

Returns the next delayed queue timestamp (don’t call directly)



92
93
94
95
96
# File 'lib/resque_scheduler.rb', line 92

def next_delayed_timestamp
  items = redis.zrangebyscore :delayed_queue_schedule, '-inf', Time.now.to_i, :limit => [0, 1]
  timestamp = items.nil? ? nil : Array(items).first
  timestamp.to_i unless timestamp.nil?
end

#next_item_for_timestamp(timestamp) ⇒ Object

Returns the next item to be processed for a given timestamp, nil if done. (don’t call directly) timestamp can either be in seconds or a datetime



101
102
103
104
105
106
107
108
109
# File 'lib/resque_scheduler.rb', line 101

def next_item_for_timestamp(timestamp)
  key = "delayed:#{timestamp.to_i}"

  item = decode redis.lpop(key)

  # If the list is empty, remove it.
  clean_up_timestamp(key, timestamp)
  item
end

#remove_delayed(klass, *args) ⇒ Object

given an encoded item, remove it from the delayed_queue



121
122
123
124
125
126
127
128
# File 'lib/resque_scheduler.rb', line 121

def remove_delayed(klass, *args)
  destroyed = 0
  search = encode(job_to_hash(klass, args))
  Array(redis.keys("delayed:*")).each do |key|
    destroyed += redis.lrem key, 0, search
  end
  destroyed
end

#reset_delayed_queueObject

Clears all jobs created with enqueue_at or enqueue_in



112
113
114
115
116
117
118
# File 'lib/resque_scheduler.rb', line 112

def reset_delayed_queue
  Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item|
    redis.del "delayed:#{item}"
  end

  redis.del :delayed_queue_schedule
end

#scheduleObject

Returns the schedule hash



33
34
35
# File 'lib/resque_scheduler.rb', line 33

def schedule
  @schedule ||= {}
end

#schedule=(schedule_hash) ⇒ Object

Accepts a new schedule configuration of the form:

{some_name => {"cron" => "5/* * * *",
               "class" => DoSomeWork,
               "args" => "work on this string",
               "description" => "this thing works it"s butter off"},
 ...}

:name can be anything and is used only to describe the scheduled job :cron can be any cron scheduling string :job can be any resque job class :class must be a resque worker class :args can be any yaml which will be converted to a ruby literal and passed

in a params. (optional)

:rails_envs is the list of envs where the job gets loaded. Envs are comma separated (optional) :description is just that, a description of the job (optional). If params is

an array, each element in the array is passed as a separate param,
otherwise params is passed in as the only parameter to perform.


28
29
30
# File 'lib/resque_scheduler.rb', line 28

def schedule=(schedule_hash)
  @schedule = schedule_hash
end