Class: SuckerPunch::Queue

Inherits:
Concurrent::Synchronization::LockableObject
  • Object
show all
Extended by:
Forwardable
Includes:
Concurrent::ExecutorService
Defined in:
lib/sucker_punch/queue.rb,
lib/sucker_punch/testing.rb

Constant Summary collapse

DEFAULT_MAX_QUEUE_SIZE =

Unlimited

0
DEFAULT_EXECUTOR_OPTIONS =
{
  min_threads:     2,
  max_threads:     2,
  idletime:        60, # 1 minute
  auto_terminate:  false # Let shutdown modes handle thread termination
}.freeze
QUEUES =
Concurrent::Map.new
PAUSE_TIME =
STDOUT.tty? ? 0.1 : 0.5

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, pool) ⇒ Queue


134
135
136
137
138
# File 'lib/sucker_punch/queue.rb', line 134

def initialize(name, pool)
  super()
  @running = true
  @name, @pool = name, pool
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name


112
113
114
# File 'lib/sucker_punch/queue.rb', line 112

def name
  @name
end

Class Method Details

.allObject


33
34
35
36
37
38
39
# File 'lib/sucker_punch/queue.rb', line 33

def self.all
  queues = Concurrent::Array.new
  QUEUES.each_pair do |name, pool|
    queues.push new(name, pool)
  end
  queues
end

.clearObject


41
42
43
44
45
46
47
48
49
# File 'lib/sucker_punch/queue.rb', line 41

def self.clear
  # susceptible to race conditions--only use in testing
  old = all
  QUEUES.clear
  SuckerPunch::Counter::Busy.clear
  SuckerPunch::Counter::Processed.clear
  SuckerPunch::Counter::Failed.clear
  old.each { |queue| queue.kill }
end

.find_or_create(name, number_workers = 2, num_jobs_max = nil) ⇒ Object


19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/sucker_punch/queue.rb', line 19

def self.find_or_create(name, num_workers = 2, num_jobs_max = nil)
  pool = QUEUES.fetch_or_store(name) do
    options = DEFAULT_EXECUTOR_OPTIONS
      .merge(
        min_threads: num_workers,
        max_threads: num_workers,
        max_queue: num_jobs_max || DEFAULT_MAX_QUEUE_SIZE
      )
    Concurrent::ThreadPoolExecutor.new(options)
  end

  new(name, pool)
end

.shutdown_allObject


74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/sucker_punch/queue.rb', line 74

def self.shutdown_all
  deadline = Time.now + SuckerPunch.shutdown_timeout

  if SuckerPunch::RUNNING.make_false
    # If a job is enqueued right before the script exits
    # (command line, rake task, etc.), the system needs an
    # interval to allow the enqueue jobs to make it in to the system
    # otherwise the queue will look idle
    sleep PAUSE_TIME

    queues = all

    # Issue shutdown to each queue and let them wrap up their work. This
    # prevents new jobs from being enqueued and lets the pool clean up
    # after itself
    queues.each { |queue| queue.shutdown }

    # return if every queue is empty and workers in every queue are idle
    return if queues.all? { |queue| queue.idle? }

    SuckerPunch.logger.info("Pausing to allow workers to finish...")

    remaining = deadline - Time.now

    # Continue to loop through each queue and test if it's idle, while
    # respecting the shutdown timeout
    while remaining > PAUSE_TIME
      return if queues.all? { |queue| queue.idle? }
      sleep PAUSE_TIME
      remaining = deadline - Time.now
    end

    # Queues haven't finished work. Aggressively kill them.
    SuckerPunch.logger.warn("Queued jobs didn't finish before shutdown_timeout...killing remaining jobs")
    queues.each { |queue| queue.kill }
  end
end

.statsObject


51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/sucker_punch/queue.rb', line 51

def self.stats
  queues = {}

  all.each do |queue|
    queues[queue.name] = {
      "workers" => {
        "total" => queue.total_workers,
        "busy" => queue.busy_workers,
        "idle" => queue.idle_workers,
      },
      "jobs" => {
        "processed" => queue.processed_jobs,
        "failed" => queue.failed_jobs,
        "enqueued" => queue.enqueued_jobs,
      }
    }
  end

  queues
end

Instance Method Details

#==(other) ⇒ Object


148
149
150
# File 'lib/sucker_punch/queue.rb', line 148

def ==(other)
  pool == other.pool
end

#busy_workersObject


152
153
154
# File 'lib/sucker_punch/queue.rb', line 152

def busy_workers
  SuckerPunch::Counter::Busy.new(name).value
end

#failed_jobsObject


164
165
166
# File 'lib/sucker_punch/queue.rb', line 164

def failed_jobs
  SuckerPunch::Counter::Failed.new(name).value
end

#idle?Boolean


144
145
146
# File 'lib/sucker_punch/queue.rb', line 144

def idle?
  enqueued_jobs == 0 && busy_workers == 0
end

#idle_workersObject


156
157
158
# File 'lib/sucker_punch/queue.rb', line 156

def idle_workers
  total_workers - busy_workers
end

#killObject


178
179
180
# File 'lib/sucker_punch/queue.rb', line 178

def kill
  @pool.kill
end

#post(*args, &block) ⇒ Object


168
169
170
171
172
173
174
175
176
# File 'lib/sucker_punch/queue.rb', line 168

def post(*args, &block)
  synchronize do
    if @running
      @pool.post(*args, &block)
    else
      false
    end
  end
end

#processed_jobsObject


160
161
162
# File 'lib/sucker_punch/queue.rb', line 160

def processed_jobs
  SuckerPunch::Counter::Processed.new(name).value
end

#running?Boolean


140
141
142
# File 'lib/sucker_punch/queue.rb', line 140

def running?
  synchronize { @running }
end

#shutdownObject


182
183
184
185
# File 'lib/sucker_punch/queue.rb', line 182

def shutdown
  synchronize { @running = false }
  @pool.shutdown
end