Class: QueueClassicPlus::Base

Inherits:
Object
  • Object
show all
Extended by:
InheritableAttribute
Defined in:
lib/queue_classic_plus/base.rb

Class Method Summary collapse

Methods included from InheritableAttribute

inherit_for, inheritable_attr

Class Method Details

._perform(*args) ⇒ Object



102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/queue_classic_plus/base.rb', line 102

def self._perform(*args)
  Metrics.timing("qu_perform_time", source: librato_key) do
    if skip_transaction
      perform(*args)
    else
      transaction do
        # .to_i defaults to 0, which means no timeout in postgres
        timeout = ENV['POSTGRES_STATEMENT_TIMEOUT'].to_i * 1000
        execute "SET LOCAL statement_timeout = #{timeout}"
        perform(*args)
      end
    end
  end
end

.can_enqueue?(method, *args) ⇒ Boolean

Returns:

  • (Boolean)


55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/queue_classic_plus/base.rb', line 55

def self.can_enqueue?(method, *args)
  if locked?
    max_lock_time = ENV.fetch("QUEUE_CLASSIC_MAX_LOCK_TIME", 10 * 60).to_i

    q = "SELECT COUNT(1) AS count
         FROM
           (
             SELECT 1
             FROM queue_classic_jobs
             WHERE q_name = $1 AND method = $2 AND args::text = $3::text
               AND (locked_at IS NULL OR locked_at > current_timestamp - interval '#{max_lock_time} seconds')
             LIMIT 1
           )
         AS x"

    result = QC.default_conn_adapter.execute(q, @queue, method, args.to_json)
    result['count'].to_i == 0
  else
    true
  end
end

.disable_retries!Object



31
32
33
34
35
36
37
# File 'lib/queue_classic_plus/base.rb', line 31

def self.disable_retries!
  unless self.retries_on.empty?
    raise 'disable_retries! should not be enabled in conjunction with retry!'
  end

  self.disable_retries = true
end

.do(*args) ⇒ Object



96
97
98
99
100
# File 'lib/queue_classic_plus/base.rb', line 96

def self.do(*args)
  Metrics.timing("qc_enqueue_time", source: librato_key) do
    enqueue_perform(*args)
  end
end

.enqueue(method, *args) ⇒ Object



77
78
79
80
81
# File 'lib/queue_classic_plus/base.rb', line 77

def self.enqueue(method, *args)
  if can_enqueue?(method, *args)
    queue.enqueue(method, *args)
  end
end

.enqueue_perform(*args) ⇒ Object



83
84
85
# File 'lib/queue_classic_plus/base.rb', line 83

def self.enqueue_perform(*args)
  enqueue("#{self.to_s}._perform", *args)
end

.enqueue_perform_in(time, *args) ⇒ Object



87
88
89
90
# File 'lib/queue_classic_plus/base.rb', line 87

def self.enqueue_perform_in(time, *args)
  raise "Can't enqueue in the future for locked jobs" if locked?
  queue.enqueue_in(time, "#{self.to_s}._perform", *args)
end

.librato_keyObject



117
118
119
# File 'lib/queue_classic_plus/base.rb', line 117

def self.librato_key
  Inflector.underscore(self.name || "").gsub(/\//, ".")
end

.listObject

Debugging



140
141
142
143
# File 'lib/queue_classic_plus/base.rb', line 140

def self.list
  q = "SELECT * FROM queue_classic_jobs WHERE q_name = '#{@queue}'"
  execute q
end

.lock!Object



39
40
41
# File 'lib/queue_classic_plus/base.rb', line 39

def self.lock!
  self.locked = true
end

.locked?Boolean

Returns:

  • (Boolean)


47
48
49
# File 'lib/queue_classic_plus/base.rb', line 47

def self.locked?
  !!self.locked
end

.loggerObject



51
52
53
# File 'lib/queue_classic_plus/base.rb', line 51

def self.logger
  QueueClassicPlus.logger
end

.queueObject



5
6
7
# File 'lib/queue_classic_plus/base.rb', line 5

def self.queue
  QC::Queue.new(@queue)
end

.restart_in(time, remaining_retries, *args) ⇒ Object



92
93
94
# File 'lib/queue_classic_plus/base.rb', line 92

def self.restart_in(time, remaining_retries, *args)
  queue.enqueue_retry_in(time, "#{self.to_s}._perform", remaining_retries, *args)
end

.retries_on?(exception) ⇒ Boolean

Returns:

  • (Boolean)


27
28
29
# File 'lib/queue_classic_plus/base.rb', line 27

def self.retries_on? exception
  self.retries_on[exception.class] || self.retries_on.keys.any? {|klass| exception.is_a? klass}
end

.retry!(on: RuntimeError, max: 5) ⇒ Object



19
20
21
22
23
24
25
# File 'lib/queue_classic_plus/base.rb', line 19

def self.retry!(on: RuntimeError, max: 5)
  if self.disable_retries
    raise 'retry! should not be used in conjuction with disable_retries!'
  end
  Array(on).each {|e| self.retries_on[e] = true}
  self.max_retries = max
end

.skip_transaction!Object



43
44
45
# File 'lib/queue_classic_plus/base.rb', line 43

def self.skip_transaction!
  self.skip_transaction = true
end

.transaction(options = {}, &block) ⇒ Object



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/queue_classic_plus/base.rb', line 121

def self.transaction(options = {}, &block)
  if defined?(ActiveRecord)
    # If ActiveRecord is loaded, we use it's own transaction mechanisn since
    # it has slightly different semanctics for rollback.
    ActiveRecord::Base.transaction(options, &block)
  else
    begin
      execute "BEGIN"
      block.call
    rescue
      execute "ROLLBACK"
      raise
    end

    execute "COMMIT"
  end
end