Class: SuckerPunch::Queue

Inherits:
Concurrent::Synchronization::LockableObject
  • Object
show all
Extended by:
Forwardable
Includes:
Concurrent::ExecutorService
Defined in:
lib/sucker_punch/queue.rb,
lib/sucker_punch/testing.rb

Constant Summary collapse

DEFAULT_MAX_QUEUE_SIZE =

Unlimited

0
DEFAULT_EXECUTOR_OPTIONS =
{
  min_threads:     2,
  max_threads:     2,
  idletime:        60, # 1 minute
  auto_terminate:  false # Let shutdown modes handle thread termination
}.freeze
QUEUES =
Concurrent::Map.new
PAUSE_TIME =
STDOUT.tty? ? 0.1 : 0.5

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, pool) ⇒ Queue

Returns a new instance of Queue.



147
148
149
150
151
# File 'lib/sucker_punch/queue.rb', line 147

def initialize(name, pool)
  super()
  @running = true
  @name, @pool = name, pool
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



125
126
127
# File 'lib/sucker_punch/queue.rb', line 125

def name
  @name
end

Class Method Details

.allObject



33
34
35
36
37
38
39
# File 'lib/sucker_punch/queue.rb', line 33

def self.all
  queues = Concurrent::Array.new
  QUEUES.each_pair do |name, pool|
    queues.push new(name, pool)
  end
  queues
end

.clearObject



41
42
43
44
45
46
47
48
49
# File 'lib/sucker_punch/queue.rb', line 41

def self.clear
  # susceptible to race conditions--only use in testing
  old = all
  QUEUES.clear
  SuckerPunch::Counter::Busy.clear
  SuckerPunch::Counter::Processed.clear
  SuckerPunch::Counter::Failed.clear
  old.each { |queue| queue.kill }
end

.find_or_create(name, _number_workers = 2, _num_jobs_max = nil) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/sucker_punch/queue.rb', line 19

def self.find_or_create(name, num_workers = 2, num_jobs_max = nil)
  pool = QUEUES.fetch_or_store(name) do
    options = DEFAULT_EXECUTOR_OPTIONS
      .merge(
        min_threads: num_workers,
        max_threads: num_workers,
        max_queue: num_jobs_max || DEFAULT_MAX_QUEUE_SIZE
      )
    Concurrent::ThreadPoolExecutor.new(**options)
  end

  new(name, pool)
end

.shutdown_allObject



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/sucker_punch/queue.rb', line 74

def self.shutdown_all
  deadline = Time.now + SuckerPunch.shutdown_timeout

  if SuckerPunch::RUNNING.make_false
    # If a job is enqueued right before the script exits
    # (command line, rake task, etc.), the system needs an
    # interval to allow the enqueue jobs to make it in to the system
    # otherwise the queue will look idle
    sleep PAUSE_TIME

    queues = all

    # Issue shutdown to each queue and let them wrap up their work. This
    # prevents new jobs from being enqueued and lets the pool clean up
    # after itself
    queues.each { |queue| queue.shutdown }

    # return if every queue is empty and workers in every queue are idle
    return if queues.all? { |queue| queue.idle? }

    SuckerPunch.logger.info("Pausing to allow workers to finish...")

    remaining = deadline - Time.now

    # Continue to loop through each queue and test if it's idle, while
    # respecting the shutdown timeout
    while remaining > PAUSE_TIME
      return if queues.all? { |queue| queue.idle? }
      sleep PAUSE_TIME
      remaining = deadline - Time.now
    end

    # Queues haven't finished work. Aggressively kill them.
    SuckerPunch.logger.warn("Queued jobs didn't finish before shutdown_timeout...killing remaining jobs")
    queues.each { |queue| queue.kill }
  end
end

.statsObject



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/sucker_punch/queue.rb', line 51

def self.stats
  queues = {}

  all.each do |queue|
    queues[queue.name] = {
      "workers" => {
        "total" => queue.total_workers,
        "busy" => queue.busy_workers,
        "idle" => queue.idle_workers,
      },
      "jobs" => {
        "processed" => queue.processed_jobs,
        "failed" => queue.failed_jobs,
        "enqueued" => queue.enqueued_jobs,
      }
    }
  end

  queues
end

.waitObject



112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/sucker_punch/queue.rb', line 112

def self.wait
  queues = all
  
  # return if every queue is empty and workers in every queue are idle
  return if queues.all? { |queue| queue.idle? }

  SuckerPunch.logger.info("Pausing to allow workers to finish...")

  while queues.any? { |queue| !queue.idle? }
    sleep PAUSE_TIME
  end
end

Instance Method Details

#==(other) ⇒ Object



161
162
163
# File 'lib/sucker_punch/queue.rb', line 161

def ==(other)
  pool == other.pool
end

#busy_workersObject



165
166
167
# File 'lib/sucker_punch/queue.rb', line 165

def busy_workers
  SuckerPunch::Counter::Busy.new(name).value
end

#failed_jobsObject



177
178
179
# File 'lib/sucker_punch/queue.rb', line 177

def failed_jobs
  SuckerPunch::Counter::Failed.new(name).value
end

#idle?Boolean

Returns:

  • (Boolean)


157
158
159
# File 'lib/sucker_punch/queue.rb', line 157

def idle?
  enqueued_jobs == 0 && busy_workers == 0
end

#idle_workersObject



169
170
171
# File 'lib/sucker_punch/queue.rb', line 169

def idle_workers
  total_workers - busy_workers
end

#killObject



192
193
194
# File 'lib/sucker_punch/queue.rb', line 192

def kill
  @pool.kill
end

#post(*args, &block) ⇒ Object



181
182
183
184
185
186
187
188
189
# File 'lib/sucker_punch/queue.rb', line 181

def post(*args, &block)
  synchronize do
    if @running
      @pool.post(*args, &block)
    else
      false
    end
  end
end

#processed_jobsObject



173
174
175
# File 'lib/sucker_punch/queue.rb', line 173

def processed_jobs
  SuckerPunch::Counter::Processed.new(name).value
end

#running?Boolean

Returns:

  • (Boolean)


153
154
155
# File 'lib/sucker_punch/queue.rb', line 153

def running?
  synchronize { @running }
end

#shutdownObject



196
197
198
199
# File 'lib/sucker_punch/queue.rb', line 196

def shutdown
  synchronize { @running = false }
  @pool.shutdown
end