Class: WorkerArmy::Queue

Inherits:
Base
  • Object
show all
Defined in:
lib/worker_army/queue.rb

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#callback_retry_count, client_retry_count, config, #config, #worker_retry_count

Constructor Details

#initializeQueue

Returns a new instance of Queue.



11
12
13
14
15
# File 'lib/worker_army/queue.rb', line 11

def initialize
  @config = config
  Queue.redis_instance
  @log = WorkerArmy::Log.new.log
end

Class Method Details

.close_redis_connectionObject



27
28
29
30
# File 'lib/worker_army/queue.rb', line 27

def close_redis_connection
  $redis.quit if $redis
  $redis = nil
end

.redis_instanceObject



18
19
20
21
22
23
24
25
# File 'lib/worker_army/queue.rb', line 18

def redis_instance
  $config = config unless $config
  unless $redis
    $redis = Redis.new(host: $config['redis_host'], port: $config['redis_port'])
  end
  $redis.auth($config['redis_auth']) if $config['redis_auth']
  $redis
end

Instance Method Details

#add_current_job(job_id) ⇒ Object



92
93
94
# File 'lib/worker_army/queue.rb', line 92

def add_current_job(job_id)
  Queue.redis_instance.sadd 'current_jobs', job_id
end

#add_failed_callback_job(job_id) ⇒ Object



120
121
122
# File 'lib/worker_army/queue.rb', line 120

def add_failed_callback_job(job_id)
  Queue.redis_instance.lpush 'failed_callback_jobs', job_id
end

#add_failed_job(job_id) ⇒ Object



116
117
118
# File 'lib/worker_army/queue.rb', line 116

def add_failed_job(job_id)
  Queue.redis_instance.lpush 'failed_jobs', job_id
end

#clear_current_jobsObject



108
109
110
# File 'lib/worker_army/queue.rb', line 108

def clear_current_jobs
  Queue.redis_instance.del 'current_jobs'
end

#current_jobsObject



104
105
106
# File 'lib/worker_army/queue.rb', line 104

def current_jobs
  Queue.redis_instance.smembers 'current_jobs'
end

#current_jobs_countObject



100
101
102
# File 'lib/worker_army/queue.rb', line 100

def current_jobs_count
  Queue.redis_instance.scard 'current_jobs'
end

#deliver_callback(job_id, callback_url, data, retry_count = 0) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/worker_army/queue.rb', line 71

def deliver_callback(job_id, callback_url, data, retry_count = 0)
  begin
    response = RestClient.post callback_url.split("?callback_url=").last,
      data.to_json, :content_type => :json, :accept => :json
    if response.code == 404 or response.code == 500
      @log.error("Response from callback url: #{response.code}")
      add_failed_callback_job(job_id)
    end
  rescue => e
    @log.error(e)
    retry_count += 1
    if retry_count < callback_retry_count(@config)
      @log.debug("Delivering external callback failed! Retrying (#{retry_count})...")
      sleep (retry_count * 2)
      deliver_callback(job_id, callback_url, data, retry_count)
    else
      add_failed_callback_job(job_id)
    end
  end
end

#failed_callback_jobsObject



136
137
138
# File 'lib/worker_army/queue.rb', line 136

def failed_callback_jobs
  Queue.redis_instance.lrange 'failed_callback_jobs', 0, failed_callback_jobs_count
end

#failed_callback_jobs_countObject



132
133
134
# File 'lib/worker_army/queue.rb', line 132

def failed_callback_jobs_count
  Queue.redis_instance.llen 'failed_callback_jobs'
end

#failed_jobsObject



128
129
130
# File 'lib/worker_army/queue.rb', line 128

def failed_jobs
  Queue.redis_instance.lrange 'failed_jobs', 0, failed_jobs_count
end

#failed_jobs_countObject



124
125
126
# File 'lib/worker_army/queue.rb', line 124

def failed_jobs_count
  Queue.redis_instance.llen 'failed_jobs'
end

#finished_jobs_countObject



171
172
173
# File 'lib/worker_army/queue.rb', line 171

def finished_jobs_count
  Queue.redis_instance.llen 'jobs'
end

#get_job_count(queue_prefix = "queue") ⇒ Object



175
176
177
# File 'lib/worker_army/queue.rb', line 175

def get_job_count(queue_prefix = "queue")
  Queue.redis_instance["#{queue_prefix}_counter"].to_i
end

#get_known_queuesObject



167
168
169
# File 'lib/worker_army/queue.rb', line 167

def get_known_queues
  Queue.redis_instance.smembers 'known_queues'
end

#get_known_workers(recent_worker_pings = 1000) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/worker_army/queue.rb', line 149

def get_known_workers(recent_worker_pings = 1000)
  worker_pings = Queue.redis_instance.lrange 'workers', 0, recent_worker_pings
  return [] unless worker_pings
  worker_pings = worker_pings.collect {|json| JSON.parse(json)}.sort_by {|h| h['timestamp'].to_i}.reverse
  uniq_workers = worker_pings.collect {|h| [h['host_name'], h['worker_pid']]}.uniq
  workers = []
  uniq_workers.each do |worker_pair|
    worker_pings.each do |hash|
      if hash['host_name'] == worker_pair[0] and hash['worker_pid'] == worker_pair[1]
        workers << hash
        break
      end
    end
  end
  now = Time.now.utc.to_i
  workers.select {|h| h if now - h['timestamp'].to_i < 3600}
end

#job_data(job_id) ⇒ Object



112
113
114
# File 'lib/worker_army/queue.rb', line 112

def job_data(job_id)
  Queue.redis_instance["job_#{job_id}"]
end

#last_pingObject



145
146
147
# File 'lib/worker_army/queue.rb', line 145

def last_ping
  Queue.redis_instance.get 'last_ping'
end

#ping(data) ⇒ Object



140
141
142
143
# File 'lib/worker_army/queue.rb', line 140

def ping(data)
  Queue.redis_instance.lpush 'workers', data.to_json
  Queue.redis_instance.set 'last_ping', data[:timestamp].to_i
end

#pop(job_class_name, queue_prefix = "queue") ⇒ Object



51
52
53
54
# File 'lib/worker_army/queue.rb', line 51

def pop(job_class_name, queue_prefix = "queue")
  raise "No redis connection!" unless Queue.redis_instance
  return Queue.redis_instance.blpop("#{queue_prefix}_#{job_class_name}")
end

#push(data, queue_prefix = "queue") ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/worker_army/queue.rb', line 33

def push(data, queue_prefix = "queue")
  if Queue.redis_instance and data
    job_count = Queue.redis_instance.incr("#{queue_prefix}_counter")
    queue_prefix = queue_prefix if queue_prefix
    queue_prefix = data['queue_prefix'] if data['queue_prefix']
    queue_name = "#{queue_prefix}_#{data['job_class']}"
    Queue.redis_instance.sadd 'known_queues', queue_name
    queue_count = Queue.redis_instance.incr("#{queue_name}_counter")
    job_id = SecureRandom.uuid
    Queue.redis_instance.rpush queue_name, data.merge(job_count: job_count,
      queue_count: queue_count, job_id: job_id, queue_name: queue_name).to_json
  end
  raise "No data" unless data
  raise "No redis connection!" unless Queue.redis_instance
  { job_count: job_count, job_id: job_id, queue_count: queue_count,
    queue_name: queue_name }
end

#remove_current_job(job_id) ⇒ Object



96
97
98
# File 'lib/worker_army/queue.rb', line 96

def remove_current_job(job_id)
  Queue.redis_instance.srem 'current_jobs', job_id
end

#save_result(data) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/worker_army/queue.rb', line 56

def save_result(data)
  if data
    job_id = data['job_id']
    callback_url = data['callback_url']
    if @config['store_job_data']
      Queue.redis_instance["job_#{job_id}"] = data.to_json
    end
    Queue.redis_instance.lpush 'jobs', job_id
    if callback_url
      data.delete("callback_url")
      deliver_callback(job_id, callback_url, data)
    end
  end
end