Module: SidekiqScheduler::Client

Defined in:
lib/sidekiq-scheduler/client.rb

Instance Method Summary collapse

Instance Method Details

#delayed_push(queue = nil, timestamp, item) ⇒ Object

Example usage: Sidekiq::Client.delayed_push(‘my_queue’, Time.now + 600, ‘class’ => MyWorker, ‘args’ => [‘foo’, 1, :bat => ‘bar’])

Raises:

  • (ArgumentError)


8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/sidekiq-scheduler/client.rb', line 8

def delayed_push(queue=nil, timestamp, item)
  raise(ArgumentError, "Message must be a Hash of the form: { 'class' => SomeClass, 'args' => ['bob', 1, :foo => 'bar'] }") unless item.is_a?(Hash)
  raise(ArgumentError, "Message must include a class and set of arguments: #{item.inspect}") if !item['class'] || !item['args']

  timestamp = timestamp.to_i

  item['queue'] = queue.to_s if queue
  item['class'] = item['class'].to_s if !item['class'].is_a?(String)

  # Add item to the list for this timestamp
  Sidekiq.redis { |r| r.rpush("delayed:#{timestamp}", MultiJson.encode(item)) }

  # Add timestamp to zset. Score and value are based on the timestamp
  # as querying will be based on that
  Sidekiq.redis { |r| r.zadd('delayed_queue_schedule', timestamp, timestamp) }
end

#remove_all_delayed(klass, *args) ⇒ Object

Example usage: Sidekiq::Client.remove_all_delayed(MyWorker, ‘foo’, 1, :bat => ‘bar’)

Returns the number of jobs removed

This method can be very expensive since it needs to scan through the delayed queues of all timestamps



40
41
42
# File 'lib/sidekiq-scheduler/client.rb', line 40

def remove_all_delayed(klass, *args)
  remove_all_delayed_from_queue(nil, klass, *args)
end

#remove_all_delayed_from_queue(queue, klass, *args) ⇒ Object

Example usage: Sidekiq::Client.remove_all_delayed(‘foo’, MyWorker, ‘foo’, 1, :bat => ‘bar’)

Returns the number of jobs removed

This method can be very expensive since it needs to scan through the delayed queues of all timestamps



51
52
53
54
55
56
57
58
59
60
# File 'lib/sidekiq-scheduler/client.rb', line 51

def remove_all_delayed_from_queue(queue, klass, *args)
  count = 0
  item = {'class' => klass.to_s, 'args' => args}
  item['queue'] = queue.to_s if queue
  search = MultiJson.encode(item)
  Array(Sidekiq.redis { |r| r.keys("delayed:*") }).each do |key|
    count += Sidekiq.redis { |r| r.lrem(key, 0, search) }
  end
  count
end

#remove_delayed(timestamp, klass, *args) ⇒ Object

Example usage: Sidekiq::Client.remove_delayed(Time.now + 600, MyWorker, ‘foo’, 1, :bat => ‘bar’)

Returns the number of jobs removed



66
67
68
# File 'lib/sidekiq-scheduler/client.rb', line 66

def remove_delayed(timestamp, klass, *args)
  remove_delayed_from_queue(nil, timestamp, klass, *args)
end

#remove_delayed_from_queue(queue, timestamp, klass, *args) ⇒ Object

Example usage: Sidekiq::Client.remove_delayed(‘foo’, Time.now + 600, MyWorker, ‘foo’, 1, :bat => ‘bar’)

Returns the number of jobs removed



75
76
77
78
79
80
81
82
83
# File 'lib/sidekiq-scheduler/client.rb', line 75

def remove_delayed_from_queue(queue, timestamp, klass, *args)
  timestamp = timestamp.to_i
  item = {'class' => klass.to_s, 'args' => args}
  item['queue'] = queue.to_s if queue
  search = MultiJson.encode(item)
  count = Sidekiq.redis { |r| r.lrem("delayed:#{timestamp}", 0, search) }
  remove_scheduler_queue(timestamp)
  count
end

#remove_scheduler_queue(timestamp) ⇒ Object



25
26
27
28
29
30
31
# File 'lib/sidekiq-scheduler/client.rb', line 25

def remove_scheduler_queue(timestamp)
  key = "delayed:#{timestamp}"
  if 0 == Sidekiq.redis { |r| r.llen(key) }
    Sidekiq.redis { |r| r.del(key) }
    Sidekiq.redis { |r| r.zrem('delayed_queue_schedule', timestamp) }
  end
end