Class: Worker::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/worker/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Queue

Returns a new instance of Queue.


10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/worker/queue.rb', line 10

def initialize(opts={})
  @logger = Logger.new(STDOUT)

  db_config = opts[:db_config] || Worker.db
  db_config.merge(logger: logger) if opts[:log_queries]

  @db = Sequel.connect(db_config)

  name = `hostname`.chomp("\n")

  @id = "#{name}.#{Process.pid}.#{rand(0..1000)}-#{opts[:queue_name] || 'all'}"

  @jobs_per_worker = opts[:jobs_per_worker] || 5
  @max_job_time = opts[:max_job_time] || 5 * 60 # in seconds
  @queue_name = opts[:queue_name]
  @max_attempts = opts[:max_attempts] || 20
  @poll_interval = opts[:poll_interval] || 5 # 5 second polling
  @log_backtrace = !!opts[:log_backtrace]

  @db_adapter = db_config[:adapter]

  create_table
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id


7
8
9
# File 'lib/worker/queue.rb', line 7

def id
  @id
end

#jobs_per_workerObject

Returns the value of attribute jobs_per_worker


8
9
10
# File 'lib/worker/queue.rb', line 8

def jobs_per_worker
  @jobs_per_worker
end

#log_backtraceObject

Returns the value of attribute log_backtrace


8
9
10
# File 'lib/worker/queue.rb', line 8

def log_backtrace
  @log_backtrace
end

#loggerObject

Returns the value of attribute logger


8
9
10
# File 'lib/worker/queue.rb', line 8

def logger
  @logger
end

#max_attemptsObject

Returns the value of attribute max_attempts


8
9
10
# File 'lib/worker/queue.rb', line 8

def max_attempts
  @max_attempts
end

#max_job_timeObject

Returns the value of attribute max_job_time


8
9
10
# File 'lib/worker/queue.rb', line 8

def max_job_time
  @max_job_time
end

#poll_intervalObject

Returns the value of attribute poll_interval


8
9
10
# File 'lib/worker/queue.rb', line 8

def poll_interval
  @poll_interval
end

#queue_nameObject

Returns the value of attribute queue_name


8
9
10
# File 'lib/worker/queue.rb', line 8

def queue_name
  @queue_name
end

Instance Method Details

#clean_locksObject

cleans locked by jobs that have been dormant for a while


60
61
62
# File 'lib/worker/queue.rb', line 60

def clean_locks
  db[:worker_jobs].where('locked_at < ?', Time.now - max_job_time).update(locked_by: nil)
end

#create_tableObject


34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/worker/queue.rb', line 34

def create_table
  db.create_table? :worker_jobs do
    primary_key :id
    Time     :locked_at,  null: true
    Time     :run_at,     null: true
    String   :locked_by,  null: true
    String   :handler,    null: false, text: true
    String   :last_error, null: true,  text: true
    String   :queue,      null: false, default: 'default'
    String   :job_class,  null: false
    Integer  :attempts,   null: false, default: 0
    Integer  :priority,   null: false, default: 0
  end
end

#dbObject


102
103
104
# File 'lib/worker/queue.rb', line 102

def db
  @db
end

#dequeueObject

locks a set of jobs for this worker


65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/worker/queue.rb', line 65

def dequeue

  if @db_adapter == 'mysql2' || @db_adapter == 'mysql'
    query = db[:worker_jobs]
                .limit(jobs_per_worker)
                .where('`run_at` < ?', Time.now)
                .where('`locked_by` IS NULL')
                .reverse_order('priority')
  else
    query = db[:worker_jobs]
                .where(
                    '`run_at` < ? AND `locked_by` IS NULL AND `id` in (SELECT `id` FROM `worker_jobs` ORDER BY `priority` DESC LIMIT ?)',
                    Time.now, jobs_per_worker)
  end

  if queue_name
    query = query.where(queue: queue_name)
  end

  query.update(locked_by: id, locked_at: Time.now)
  db[:worker_jobs].where(locked_by: id)
end

#enqueue(job) ⇒ Object


88
89
90
91
92
93
94
95
96
# File 'lib/worker/queue.rb', line 88

def enqueue(job)
  db[:worker_jobs].insert(
      job_class: job.class.name,
      queue: job.queue,
      priority: job.priority,
      handler: job.serialize,
      run_at: job.run_at,
  )
end

#remove(id) ⇒ Object


55
56
57
# File 'lib/worker/queue.rb', line 55

def remove(id)
  db[:worker_jobs].where(id: id).delete
end

#run(number_to_run = nil) ⇒ Object


106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/worker/queue.rb', line 106

def run(number_to_run=nil)
  begin
    while true

      logger.debug("Polling")

      t1 = Time.now
      failures = 0
      successes = 0

      dequeue.each do |item|

        if number_to_run
          number_to_run -= 1
        end

        logger.info("Performing #{item[:job_class]}.#{item[:id]} attempts=#{item[:attempts]} priority=#{item[:priority]} run_at=#{item[:run_at]}")

        job = nil
        begin
          job = eval(item[:job_class]).deserialize(item[:handler])

          Timeout::timeout(max_job_time) do
            job.before
            job.perform
            job.on_success
          end

          remove(item[:id])

          successes += 1
          logger.info("Successful #{item[:job_class]}.#{item[:id]}")

        rescue Exception => e

          job.on_failure if job

          if item[:attempts] + 1 > max_attempts
            remove(item[:id])
          else
            set_error(item, e)
          end

          failures += 1
          logger.info("Failed #{item[:job_class]}.#{item[:id]} err: #{e.class.name} #{e.message}")
          if log_backtrace
            puts e.backtrace
          end

        end
      end

      t2 = Time.now

      if failures + successes > 0
        logger.info("Performed #{failures + successes} jobs in #{t2 - t1} seconds")
      end

      return if number_to_run && number_to_run <= 0
      sleep(poll_interval)
    end
  rescue StandardError => e
    logger.warn("Exiting due to #{e}")
    return
  ensure
    logger.info("Unlocking")
    unlock_all
  end

end

#set_error(job, error) ⇒ Object


49
50
51
52
53
# File 'lib/worker/queue.rb', line 49

def set_error(job, error)
  db[:worker_jobs]
      .where(id: job[:id])
      .update(last_error: error.backtrace.join("\n"), run_at: Time.now + 10)
end

#unlock_allObject


98
99
100
# File 'lib/worker/queue.rb', line 98

def unlock_all
  db[:worker_jobs].where(locked_by: id).update(locked_by: nil)
end