Module: SidekiqScheduler::Client
- Defined in:
- lib/sidekiq-scheduler/client.rb
Instance Method Summary collapse
-
#delayed_push(queue = nil, timestamp, item) ⇒ Object
Example usage: Sidekiq::Client.delayed_push(‘my_queue’, Time.now + 600, ‘class’ => MyWorker, ‘args’ => [‘foo’, 1, :bat => ‘bar’]).
-
#remove_all_delayed(klass, *args) ⇒ Object
Example usage: Sidekiq::Client.remove_all_delayed(MyWorker, ‘foo’, 1, :bat => ‘bar’).
-
#remove_all_delayed_from_queue(queue, klass, *args) ⇒ Object
Example usage: Sidekiq::Client.remove_all_delayed(‘foo’, MyWorker, ‘foo’, 1, :bat => ‘bar’).
-
#remove_delayed(timestamp, klass, *args) ⇒ Object
Example usage: Sidekiq::Client.remove_delayed(Time.now + 600, MyWorker, ‘foo’, 1, :bat => ‘bar’).
-
#remove_delayed_from_queue(queue, timestamp, klass, *args) ⇒ Object
Example usage: Sidekiq::Client.remove_delayed(‘foo’, Time.now + 600, MyWorker, ‘foo’, 1, :bat => ‘bar’).
- #remove_scheduler_queue(timestamp) ⇒ Object
Instance Method Details
#delayed_push(queue = nil, timestamp, item) ⇒ Object
Example usage: Sidekiq::Client.delayed_push(‘my_queue’, Time.now + 600, ‘class’ => MyWorker, ‘args’ => [‘foo’, 1, :bat => ‘bar’])
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/sidekiq-scheduler/client.rb', line 8 def delayed_push(queue=nil, , item) raise(ArgumentError, "Message must be a Hash of the form: { 'class' => SomeClass, 'args' => ['bob', 1, :foo => 'bar'] }") unless item.is_a?(Hash) raise(ArgumentError, "Message must include a class and set of arguments: #{item.inspect}") if !item['class'] || !item['args'] = .to_i item['queue'] = queue.to_s if queue item['class'] = item['class'].to_s if !item['class'].is_a?(String) # Add item to the list for this timestamp Sidekiq.redis { |r| r.rpush("delayed:#{}", MultiJson.encode(item)) } # Add timestamp to zset. Score and value are based on the timestamp # as querying will be based on that Sidekiq.redis { |r| r.zadd('delayed_queue_schedule', , ) } end |
#remove_all_delayed(klass, *args) ⇒ Object
Example usage: Sidekiq::Client.remove_all_delayed(MyWorker, ‘foo’, 1, :bat => ‘bar’)
Returns the number of jobs removed
This method can be very expensive since it needs to scan through the delayed queues of all timestamps
40 41 42 |
# File 'lib/sidekiq-scheduler/client.rb', line 40 def remove_all_delayed(klass, *args) remove_all_delayed_from_queue(nil, klass, *args) end |
#remove_all_delayed_from_queue(queue, klass, *args) ⇒ Object
Example usage: Sidekiq::Client.remove_all_delayed(‘foo’, MyWorker, ‘foo’, 1, :bat => ‘bar’)
Returns the number of jobs removed
This method can be very expensive since it needs to scan through the delayed queues of all timestamps
51 52 53 54 55 56 57 58 59 60 |
# File 'lib/sidekiq-scheduler/client.rb', line 51 def remove_all_delayed_from_queue(queue, klass, *args) count = 0 item = {'class' => klass.to_s, 'args' => args} item['queue'] = queue.to_s if queue search = MultiJson.encode(item) Array(Sidekiq.redis { |r| r.keys("delayed:*") }).each do |key| count += Sidekiq.redis { |r| r.lrem(key, 0, search) } end count end |
#remove_delayed(timestamp, klass, *args) ⇒ Object
Example usage: Sidekiq::Client.remove_delayed(Time.now + 600, MyWorker, ‘foo’, 1, :bat => ‘bar’)
Returns the number of jobs removed
66 67 68 |
# File 'lib/sidekiq-scheduler/client.rb', line 66 def remove_delayed(, klass, *args) remove_delayed_from_queue(nil, , klass, *args) end |
#remove_delayed_from_queue(queue, timestamp, klass, *args) ⇒ Object
Example usage: Sidekiq::Client.remove_delayed(‘foo’, Time.now + 600, MyWorker, ‘foo’, 1, :bat => ‘bar’)
Returns the number of jobs removed
75 76 77 78 79 80 81 82 83 |
# File 'lib/sidekiq-scheduler/client.rb', line 75 def remove_delayed_from_queue(queue, , klass, *args) = .to_i item = {'class' => klass.to_s, 'args' => args} item['queue'] = queue.to_s if queue search = MultiJson.encode(item) count = Sidekiq.redis { |r| r.lrem("delayed:#{}", 0, search) } remove_scheduler_queue() count end |
#remove_scheduler_queue(timestamp) ⇒ Object
25 26 27 28 29 30 31 |
# File 'lib/sidekiq-scheduler/client.rb', line 25 def remove_scheduler_queue() key = "delayed:#{}" if 0 == Sidekiq.redis { |r| r.llen(key) } Sidekiq.redis { |r| r.del(key) } Sidekiq.redis { |r| r.zrem('delayed_queue_schedule', ) } end end |