Module: Qu::Extensions::Scheduler::Redis
- Defined in:
- lib/qu/extensions/scheduler/redis.rb
Instance Method Summary collapse
- #count_all_scheduled_jobs ⇒ Object
-
#delayed_push(timestamp, payload) ⇒ Object
Used internally to stuff the item into the schedule sorted list.
-
#delayed_queue_peek(start, count) ⇒ Object
Returns an array of timestamps based on start and count.
-
#delayed_queue_schedule_size ⇒ Object
Returns the size of the delayed queue schedule.
-
#delayed_timestamp_size(timestamp) ⇒ Object
Returns the number of jobs for a given timestamp in the delayed queue schedule.
-
#get_schedule(name) ⇒ Object
retrieve the schedule configuration for the given name.
-
#get_schedules ⇒ Object
gets the schedule as it exists in redis.
-
#list_range(key, start = 0, count = 1) ⇒ Object
Does the dirty work of fetching a range of items from a Redis list and converting them into Ruby objects.
-
#load_schedule! ⇒ Object
Pulls the schedule from Qu.schedule and loads it into the rufus scheduler instance.
-
#next_delayed_timestamp(at_time = nil) ⇒ Object
Returns the next delayed queue timestamp (don’t call directly).
-
#next_item_for_timestamp(timestamp) ⇒ Object
Returns the next item to be processed for a given timestamp, nil if done.
-
#remove_delayed(klass, *args) ⇒ Object
Given an encoded item, remove it from the delayed_queue.
-
#remove_delayed_job_from_timestamp(timestamp, klass, *args) ⇒ Object
Given a timestamp and job (klass + args) it removes all instances and returns the count of jobs removed.
-
#remove_schedule(name) ⇒ Object
remove a given schedule by name.
-
#reset_delayed_queue ⇒ Object
Clears all jobs created with enqueue_at or enqueue_in.
-
#set_schedule(name, config) ⇒ Object
Create or update a schedule with the provided name and configuration.
- #update_schedule ⇒ Object
Instance Method Details
#count_all_scheduled_jobs ⇒ Object
178 179 180 181 182 183 184 |
# File 'lib/qu/extensions/scheduler/redis.rb', line 178 def count_all_scheduled_jobs total_jobs = 0 Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do || total_jobs += redis.llen("delayed:#{}").to_i end total_jobs end |
#delayed_push(timestamp, payload) ⇒ Object
Used internally to stuff the item into the schedule sorted list. timestamp
can be either in seconds or a datetime object Insertion if O(log(n)). Returns true if it’s the first job to be scheduled at that time, else false
98 99 100 101 102 103 104 105 106 |
# File 'lib/qu/extensions/scheduler/redis.rb', line 98 def delayed_push(, payload) # First add this item to the list for this timestamp redis.rpush("delayed:#{.to_i}", encode('klass' => payload.klass.to_s, 'args' => payload.args)) # Now, add this timestamp to the zsets. The score and the value are # the same since we'll be querying by timestamp, and we don't have # anything else to store. redis.zadd :delayed_queue_schedule, .to_i, .to_i end |
#delayed_queue_peek(start, count) ⇒ Object
Returns an array of timestamps based on start and count
109 110 111 |
# File 'lib/qu/extensions/scheduler/redis.rb', line 109 def delayed_queue_peek(start, count) Array(redis.zrange(:delayed_queue_schedule, start, start+count)).collect{|x| x.to_i} end |
#delayed_queue_schedule_size ⇒ Object
Returns the size of the delayed queue schedule
114 115 116 |
# File 'lib/qu/extensions/scheduler/redis.rb', line 114 def delayed_queue_schedule_size redis.zcard :delayed_queue_schedule end |
#delayed_timestamp_size(timestamp) ⇒ Object
Returns the number of jobs for a given timestamp in the delayed queue schedule
119 120 121 |
# File 'lib/qu/extensions/scheduler/redis.rb', line 119 def () redis.llen("delayed:#{.to_i}").to_i end |
#get_schedule(name) ⇒ Object
retrieve the schedule configuration for the given name
49 50 51 |
# File 'lib/qu/extensions/scheduler/redis.rb', line 49 def get_schedule(name) decode(redis.hget(:schedules, name)) end |
#get_schedules ⇒ Object
gets the schedule as it exists in redis
18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/qu/extensions/scheduler/redis.rb', line 18 def get_schedules if redis.exists(:schedules) redis.hgetall(:schedules).tap do |h| h.each do |name, config| h[name] = decode(config) end end else nil end end |
#list_range(key, start = 0, count = 1) ⇒ Object
Does the dirty work of fetching a range of items from a Redis list and converting them into Ruby objects.
7 8 9 10 11 12 13 14 15 |
# File 'lib/qu/extensions/scheduler/redis.rb', line 7 def list_range(key, start = 0, count = 1) if count == 1 decode redis.lindex(key, start) else Array(redis.lrange(key, start, start+count-1)).map do |item| decode item end end end |
#load_schedule! ⇒ Object
Pulls the schedule from Qu.schedule and loads it into the rufus scheduler instance
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/qu/extensions/scheduler/redis.rb', line 77 def load_schedule! Qu::Scheduler.set_process_title "loading schedule" # Need to load the schedule from redis for the first time if dynamic Qu.reload_schedule! if Qu::Scheduler.dynamic Qu.logger.warning("Schedule empty! Set Qu.schedule") if Qu.schedule.empty? @@scheduled_jobs = {} Qu.schedule.each do |name, config| Qu::Scheduler.load_schedule_job(name, config) end redis.del(:schedules_changed) Qu::Scheduler.set_process_title "running" end |
#next_delayed_timestamp(at_time = nil) ⇒ Object
Returns the next delayed queue timestamp (don’t call directly)
125 126 127 128 129 |
# File 'lib/qu/extensions/scheduler/redis.rb', line 125 def (at_time=nil) items = redis.zrangebyscore :delayed_queue_schedule, '-inf', (at_time || Time.now).to_i, :limit => [0, 1] = items.nil? ? nil : Array(items).first .to_i unless .nil? end |
#next_item_for_timestamp(timestamp) ⇒ Object
Returns the next item to be processed for a given timestamp, nil if done. (don’t call directly) timestamp
can either be in seconds or a datetime
134 135 136 137 138 139 140 141 142 |
# File 'lib/qu/extensions/scheduler/redis.rb', line 134 def () key = "delayed:#{.to_i}" item = decode redis.lpop(key) # If the list is empty, remove it. (key, ) item end |
#remove_delayed(klass, *args) ⇒ Object
Given an encoded item, remove it from the delayed_queue
This method is potentially very expensive since it needs to scan through the delayed queue for every timestamp.
157 158 159 160 161 162 163 164 |
# File 'lib/qu/extensions/scheduler/redis.rb', line 157 def remove_delayed(klass, *args) destroyed = 0 search = encode('klass' => klass.to_s, 'args' => args) Array(redis.keys("delayed:*")).each do |key| destroyed += redis.lrem key, 0, search end destroyed end |
#remove_delayed_job_from_timestamp(timestamp, klass, *args) ⇒ Object
Given a timestamp and job (klass + args) it removes all instances and returns the count of jobs removed.
O(N) where N is the number of jobs scheduled to fire at the given timestamp
171 172 173 174 175 176 |
# File 'lib/qu/extensions/scheduler/redis.rb', line 171 def (, klass, *args) key = "delayed:#{.to_i}" count = redis.lrem key, 0, encode('klass' => klass.to_s, 'args' => args) (key, ) count end |
#remove_schedule(name) ⇒ Object
remove a given schedule by name
54 55 56 57 |
# File 'lib/qu/extensions/scheduler/redis.rb', line 54 def remove_schedule(name) redis.hdel(:schedules, name) redis.sadd(:schedules_changed, name) end |
#reset_delayed_queue ⇒ Object
Clears all jobs created with enqueue_at or enqueue_in
145 146 147 148 149 150 151 |
# File 'lib/qu/extensions/scheduler/redis.rb', line 145 def reset_delayed_queue Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item| redis.del "delayed:#{item}" end redis.del :delayed_queue_schedule end |
#set_schedule(name, config) ⇒ Object
Create or update a schedule with the provided name and configuration.
Note: values for class and custom_job_class need to be strings, not constants.
Qu.set_schedule('some_job', { :class => 'SomeJob',
:every => '15mins',
:queue => 'high',
:args => '/tmp/poop' })
39 40 41 42 43 44 45 46 |
# File 'lib/qu/extensions/scheduler/redis.rb', line 39 def set_schedule(name, config) existing_config = get_schedule(name) unless existing_config && existing_config == config redis.hset(:schedules, name, encode(config)) redis.sadd(:schedules_changed, name) end config end |
#update_schedule ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/qu/extensions/scheduler/redis.rb', line 59 def update_schedule if redis.scard(:schedules_changed) > 0 Qu::Scheduler.set_process_title "updating schedule" Qu.reload_schedule! while schedule_name = redis.spop(:schedules_changed) if Qu.schedule.keys.include?(schedule_name) Qu::Scheduler.unschedule_job(schedule_name) Qu::Scheduler.load_schedule_job(schedule_name, Qu.schedule[schedule_name]) else Qu::Scheduler.unschedule_job(schedule_name) end end Qu::Scheduler.set_process_title "running" end end |