Class: Sidekiq::Throttled::QueuesPauser

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/sidekiq/throttled/queues_pauser.rb

Overview

Singleton class used to pause queues from being processed. For the sake of efficiency it uses Communicator behind the scene to notify all processes about paused/resumed queues.

Instance Method Summary collapse

Constructor Details

#initializeQueuesPauser

Initializes singleton instance.



39
40
41
42
# File 'lib/sidekiq/throttled/queues_pauser.rb', line 39

def initialize
  @paused_queues = Set.new
  @communicator  = Communicator.instance
end

Instance Method Details

#filter(queues) ⇒ Array<String>

Returns queues list with paused queues being stripped out.



70
71
72
# File 'lib/sidekiq/throttled/queues_pauser.rb', line 70

def filter(queues)
  queues - @paused_queues.to_a
end

#pause!(queue) ⇒ void

This method returns an undefined value.

Pauses given `queue`.



85
86
87
88
89
90
91
92
# File 'lib/sidekiq/throttled/queues_pauser.rb', line 85

def pause!(queue)
  queue = QueueName.normalize queue.to_s

  Sidekiq.redis do |conn|
    conn.sadd(PAUSED_QUEUES, queue)
    @communicator.transmit(conn, PAUSE_MESSAGE, queue)
  end
end

#paused?(queue) ⇒ Boolean

Checks if given `queue` is paused.



98
99
100
101
# File 'lib/sidekiq/throttled/queues_pauser.rb', line 98

def paused?(queue)
  queue = QueueName.normalize queue.to_s
  Sidekiq.redis { |conn| conn.sismember(PAUSED_QUEUES, queue) }
end

#paused_queuesArray<String>

Returns list of paused queues.



77
78
79
# File 'lib/sidekiq/throttled/queues_pauser.rb', line 77

def paused_queues
  Sidekiq.redis { |conn| conn.smembers(PAUSED_QUEUES).to_a }
end

#resume!(queue) ⇒ void

This method returns an undefined value.

Resumes given `queue`.



107
108
109
110
111
112
113
114
# File 'lib/sidekiq/throttled/queues_pauser.rb', line 107

def resume!(queue)
  queue = QueueName.normalize queue.to_s

  Sidekiq.redis do |conn|
    conn.srem(PAUSED_QUEUES, queue)
    @communicator.transmit(conn, RESUME_MESSAGE, queue)
  end
end

#setup!void

This method returns an undefined value.

Configures Sidekiq server to keep actual list of paused queues.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/sidekiq/throttled/queues_pauser.rb', line 48

def setup!
  Patches::Queue.apply!

  Sidekiq.configure_server do
    @communicator.receive PAUSE_MESSAGE do |queue|
      @paused_queues << QueueName.expand(queue)
    end

    @communicator.receive RESUME_MESSAGE do |queue|
      @paused_queues.delete QueueName.expand(queue)
    end

    @communicator.ready do
      @paused_queues.replace(paused_queues.map { |q| QueueName.expand q })
    end
  end
end