Class: Worker::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/worker/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Queue

Returns a new instance of Queue.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/worker/queue.rb', line 10

def initialize(opts={})
  @logger = Logger.new(STDOUT)

  db_config = opts[:db_config] || Worker.db
  db_config.merge(logger: logger) if opts[:log_queries]

  @db = Sequel.connect(db_config)

  name = `hostname`.chomp("\n")

  @id = "#{name}.#{Process.pid}.#{rand(0..1000)}-#{opts[:queue_name] || 'all'}"

  @jobs_per_worker = opts[:jobs_per_worker] || 5
  @max_job_time = opts[:max_job_time] || 5 * 60 # in seconds
  @queue_name = opts[:queue_name]
  @max_attempts = opts[:max_attempts] || 20
  @poll_interval = opts[:poll_interval] || 5 # 5 second polling
  @log_backtrace = !!opts[:log_backtrace]

  create_table
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



7
8
9
# File 'lib/worker/queue.rb', line 7

def id
  @id
end

#jobs_per_workerObject

Returns the value of attribute jobs_per_worker.



8
9
10
# File 'lib/worker/queue.rb', line 8

def jobs_per_worker
  @jobs_per_worker
end

#log_backtraceObject

Returns the value of attribute log_backtrace.



8
9
10
# File 'lib/worker/queue.rb', line 8

def log_backtrace
  @log_backtrace
end

#loggerObject

Returns the value of attribute logger.



8
9
10
# File 'lib/worker/queue.rb', line 8

def logger
  @logger
end

#max_attemptsObject

Returns the value of attribute max_attempts.



8
9
10
# File 'lib/worker/queue.rb', line 8

def max_attempts
  @max_attempts
end

#max_job_timeObject

Returns the value of attribute max_job_time.



8
9
10
# File 'lib/worker/queue.rb', line 8

def max_job_time
  @max_job_time
end

#poll_intervalObject

Returns the value of attribute poll_interval.



8
9
10
# File 'lib/worker/queue.rb', line 8

def poll_interval
  @poll_interval
end

#queue_nameObject

Returns the value of attribute queue_name.



8
9
10
# File 'lib/worker/queue.rb', line 8

def queue_name
  @queue_name
end

Instance Method Details

#clean_locksObject

cleans locked by jobs that have been dormant for a while



58
59
60
# File 'lib/worker/queue.rb', line 58

def clean_locks
  db[:worker_jobs].where('locked_at < ?', Time.now - max_job_time).update(locked_by: nil)
end

#create_tableObject



32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/worker/queue.rb', line 32

def create_table
  db.create_table? :worker_jobs do
    primary_key :id
    Time     :locked_at,  null: true
    Time     :run_at,     null: true
    String   :locked_by,  null: true
    String   :handler,    null: false, text: true
    String   :last_error, null: true,  text: true
    String   :queue,      null: false, default: 'default'
    String   :job_class,  null: false
    Integer  :attempts,   null: false, default: 0
    Integer  :priority,   null: false, default: 0
  end
end

#dbObject



91
92
93
# File 'lib/worker/queue.rb', line 91

def db
  @db
end

#dequeueObject

locks a set of jobs for this worker



63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/worker/queue.rb', line 63

def dequeue
  query = db[:worker_jobs]
              .where(
                  '`run_at` < ? AND `locked_by` IS NULL AND `id` in (SELECT `id` FROM `worker_jobs` ORDER BY `priority` DESC LIMIT ?)',
                  Time.now, jobs_per_worker)

  if queue_name
    query = query.where(queue: queue_name)
  end

  query.update(locked_by: id, locked_at: Time.now)
  db[:worker_jobs].where(locked_by: id)
end

#enqueue(job) ⇒ Object



77
78
79
80
81
82
83
84
85
# File 'lib/worker/queue.rb', line 77

def enqueue(job)
  db[:worker_jobs].insert(
      job_class: job.class.name,
      queue: job.queue,
      priority: job.priority,
      handler: job.serialize,
      run_at: job.run_at,
  )
end

#remove(id) ⇒ Object



53
54
55
# File 'lib/worker/queue.rb', line 53

def remove(id)
  db[:worker_jobs].where(id: id).delete
end

#run(number_to_run = nil) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/worker/queue.rb', line 95

def run(number_to_run=nil)
  begin
    while true

      logger.debug("Polling")

      t1 = Time.now
      failures = 0
      successes = 0

      dequeue.each do |item|

        if number_to_run
          number_to_run -= 1
        end

        logger.info("Performing #{item[:job_class]}.#{item[:id]} attempts=#{item[:attempts]} priority=#{item[:priority]} run_at=#{item[:run_at]}")

        job = nil
        begin
          job = eval(item[:job_class]).deserialize(item[:handler])

          Timeout::timeout(max_job_time) do
            job.before
            job.perform
            job.on_success
          end

          remove(item[:id])

          successes += 1
          logger.info("Successful #{item[:job_class]}.#{item[:id]}")

        rescue Exception => e

          job.on_failure if job

          if item[:attempts] + 1 > max_attempts
            remove(item[:id])
          else
            set_error(item, e)
          end

          failures += 1
          logger.info("Failed #{item[:job_class]}.#{item[:id]} err: #{e.class.name} #{e.message}")
          if log_backtrace
            puts e.backtrace
          end

        end
      end

      t2 = Time.now

      if failures + successes > 0
        logger.info("Performed #{failures + successes} jobs in #{t2 - t1} seconds")
      end

      return if number_to_run && number_to_run <= 0
      sleep(poll_interval)
    end
  rescue StandardError => e
    logger.warn("Exiting due to #{e}")
    return
  ensure
    logger.info("Unlocking")
    unlock_all
  end

end

#set_error(job, error) ⇒ Object



47
48
49
50
51
# File 'lib/worker/queue.rb', line 47

def set_error(job, error)
  db[:worker_jobs]
      .where(id: job[:id])
      .update(last_error: error.backtrace.join("\n"), run_at: Time.now + 10)
end

#unlock_allObject



87
88
89
# File 'lib/worker/queue.rb', line 87

def unlock_all
  db[:worker_jobs].where(locked_by: id).update(locked_by: nil)
end