Class: SuckerPunch::Queue

Inherits:
Concurrent::Synchronization::LockableObject
  • Object
show all
Extended by:
Forwardable
Includes:
Concurrent::ExecutorService
Defined in:
lib/sucker_punch/queue.rb

Constant Summary collapse

DEFAULT_EXECUTOR_OPTIONS =
{
  min_threads:     2,
  max_threads:     2,
  idletime:        60, # 1 minute
  max_queue:       0, # unlimited
  auto_terminate:  false # Let shutdown modes handle thread termination
}.freeze
QUEUES =
Concurrent::Map.new
PAUSE_TIME =
STDOUT.tty? ? 0.1 : 0.5

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, pool) ⇒ Queue

Returns a new instance of Queue.



130
131
132
133
134
# File 'lib/sucker_punch/queue.rb', line 130

def initialize(name, pool)
  super()
  @running = true
  @name, @pool = name, pool
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



109
110
111
# File 'lib/sucker_punch/queue.rb', line 109

def name
  @name
end

Class Method Details

.allObject



30
31
32
33
34
35
36
# File 'lib/sucker_punch/queue.rb', line 30

def self.all
  queues = Concurrent::Array.new
  QUEUES.each_pair do |name, pool|
    queues.push new(name, pool)
  end
  queues
end

.clearObject



38
39
40
41
42
43
44
45
46
# File 'lib/sucker_punch/queue.rb', line 38

def self.clear
  # susceptible to race conditions--only use in testing
  old = all
  QUEUES.clear
  SuckerPunch::Counter::Busy.clear
  SuckerPunch::Counter::Processed.clear
  SuckerPunch::Counter::Failed.clear
  old.each { |queue| queue.kill }
end

.find_or_create(name, num_workers = 2) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
# File 'lib/sucker_punch/queue.rb', line 18

def self.find_or_create(name, num_workers = 2)
  pool = QUEUES.fetch_or_store(name) do
    options = DEFAULT_EXECUTOR_OPTIONS.merge({
      min_threads: num_workers,
      max_threads: num_workers
    })
    Concurrent::ThreadPoolExecutor.new(options)
  end

  new(name, pool)
end

.shutdown_allObject



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/sucker_punch/queue.rb', line 71

def self.shutdown_all
  deadline = Time.now + SuckerPunch.shutdown_timeout

  if SuckerPunch::RUNNING.make_false
    # If a job is enqueued right before the script exits
    # (command line, rake task, etc.), the system needs an
    # interval to allow the enqueue jobs to make it in to the system
    # otherwise the queue will look idle
    sleep PAUSE_TIME

    queues = all

    # Issue shutdown to each queue and let them wrap up their work. This
    # prevents new jobs from being enqueued and lets the pool clean up
    # after itself
    queues.each { |queue| queue.shutdown }

    # return if every queue is empty and workers in every queue are idle
    return if queues.all? { |queue| queue.idle? }

    SuckerPunch.logger.info("Pausing to allow workers to finish...")

    remaining = deadline - Time.now

    # Continue to loop through each queue and test if it's idle, while
    # respecting the shutdown timeout
    while remaining > PAUSE_TIME
      return if queues.all? { |queue| queue.idle? }
      sleep PAUSE_TIME
      remaining = deadline - Time.now
    end

    # Queues haven't finished work. Aggressively kill them.
    SuckerPunch.logger.warn("Queued jobs didn't finish before shutdown_timeout...killing remaining jobs")
    queues.each { |queue| queue.kill }
  end
end

.statsObject



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/sucker_punch/queue.rb', line 48

def self.stats
  queues = {}

  all.each do |queue|
    queues[queue.name] = {
      "workers" => {
        "total" => queue.total_workers,
        "busy" => queue.busy_workers,
        "idle" => queue.idle_workers,
      },
      "jobs" => {
        "processed" => queue.processed_jobs,
        "failed" => queue.failed_jobs,
        "enqueued" => queue.enqueued_jobs,
      }
    }
  end

  queues
end

Instance Method Details

#==(other) ⇒ Object



144
145
146
# File 'lib/sucker_punch/queue.rb', line 144

def ==(other)
  pool == other.pool
end

#busy_workersObject



148
149
150
# File 'lib/sucker_punch/queue.rb', line 148

def busy_workers
  SuckerPunch::Counter::Busy.new(name).value
end

#failed_jobsObject



160
161
162
# File 'lib/sucker_punch/queue.rb', line 160

def failed_jobs
  SuckerPunch::Counter::Failed.new(name).value
end

#idle?Boolean

Returns:

  • (Boolean)


140
141
142
# File 'lib/sucker_punch/queue.rb', line 140

def idle?
  enqueued_jobs == 0 && busy_workers == 0
end

#idle_workersObject



152
153
154
# File 'lib/sucker_punch/queue.rb', line 152

def idle_workers
  total_workers - busy_workers
end

#killObject



174
175
176
# File 'lib/sucker_punch/queue.rb', line 174

def kill
  @pool.kill
end

#post(*args, &block) ⇒ Object



164
165
166
167
168
169
170
171
172
# File 'lib/sucker_punch/queue.rb', line 164

def post(*args, &block)
  synchronize do
    if @running
      @pool.post(*args, &block)
    else
      false
    end
  end
end

#processed_jobsObject



156
157
158
# File 'lib/sucker_punch/queue.rb', line 156

def processed_jobs
  SuckerPunch::Counter::Processed.new(name).value
end

#running?Boolean

Returns:

  • (Boolean)


136
137
138
# File 'lib/sucker_punch/queue.rb', line 136

def running?
  synchronize { @running }
end

#shutdownObject



178
179
180
181
# File 'lib/sucker_punch/queue.rb', line 178

def shutdown
  synchronize { @running = false }
  @pool.shutdown
end