Class: WorkerArmy::Queue
- Inherits:
-
Base
- Object
- Base
- WorkerArmy::Queue
show all
- Defined in:
- lib/worker_army/queue.rb
Class Method Summary
collapse
Instance Method Summary
collapse
Methods inherited from Base
#callback_retry_count, client_retry_count, config, #config, #worker_retry_count
Constructor Details
#initialize ⇒ Queue
Returns a new instance of Queue.
Class Method Details
.close_redis_connection ⇒ Object
27
28
29
30
|
# File 'lib/worker_army/queue.rb', line 27
def close_redis_connection
$redis.quit if $redis
$redis = nil
end
|
.redis_instance ⇒ Object
18
19
20
21
22
23
24
25
|
# File 'lib/worker_army/queue.rb', line 18
def redis_instance
$config = config unless $config
unless $redis
$redis = Redis.new(host: $config['redis_host'], port: $config['redis_port'])
end
$redis.auth($config['redis_auth']) if $config['redis_auth']
$redis
end
|
Instance Method Details
#add_current_job(job_id) ⇒ Object
92
93
94
|
# File 'lib/worker_army/queue.rb', line 92
def add_current_job(job_id)
Queue.redis_instance.sadd 'current_jobs', job_id
end
|
#add_failed_callback_job(job_id) ⇒ Object
120
121
122
|
# File 'lib/worker_army/queue.rb', line 120
def add_failed_callback_job(job_id)
Queue.redis_instance.lpush 'failed_callback_jobs', job_id
end
|
#add_failed_job(job_id) ⇒ Object
116
117
118
|
# File 'lib/worker_army/queue.rb', line 116
def add_failed_job(job_id)
Queue.redis_instance.lpush 'failed_jobs', job_id
end
|
#clear_current_jobs ⇒ Object
108
109
110
|
# File 'lib/worker_army/queue.rb', line 108
def clear_current_jobs
Queue.redis_instance.del 'current_jobs'
end
|
#current_jobs ⇒ Object
104
105
106
|
# File 'lib/worker_army/queue.rb', line 104
def current_jobs
Queue.redis_instance.smembers 'current_jobs'
end
|
#current_jobs_count ⇒ Object
100
101
102
|
# File 'lib/worker_army/queue.rb', line 100
def current_jobs_count
Queue.redis_instance.scard 'current_jobs'
end
|
#deliver_callback(job_id, callback_url, data, retry_count = 0) ⇒ Object
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
# File 'lib/worker_army/queue.rb', line 71
def deliver_callback(job_id, callback_url, data, retry_count = 0)
begin
response = RestClient.post callback_url.split("?callback_url=").last,
data.to_json, :content_type => :json, :accept => :json
if response.code == 404 or response.code == 500
@log.error("Response from callback url: #{response.code}")
add_failed_callback_job(job_id)
end
rescue => e
@log.error(e)
retry_count += 1
if retry_count < callback_retry_count(@config)
@log.debug("Delivering external callback failed! Retrying (#{retry_count})...")
sleep (retry_count * 2)
deliver_callback(job_id, callback_url, data, retry_count)
else
add_failed_callback_job(job_id)
end
end
end
|
#failed_callback_jobs ⇒ Object
136
137
138
|
# File 'lib/worker_army/queue.rb', line 136
def failed_callback_jobs
Queue.redis_instance.lrange 'failed_callback_jobs', 0, failed_callback_jobs_count
end
|
#failed_callback_jobs_count ⇒ Object
132
133
134
|
# File 'lib/worker_army/queue.rb', line 132
def failed_callback_jobs_count
Queue.redis_instance.llen 'failed_callback_jobs'
end
|
#failed_jobs ⇒ Object
128
129
130
|
# File 'lib/worker_army/queue.rb', line 128
def failed_jobs
Queue.redis_instance.lrange 'failed_jobs', 0, failed_jobs_count
end
|
#failed_jobs_count ⇒ Object
124
125
126
|
# File 'lib/worker_army/queue.rb', line 124
def failed_jobs_count
Queue.redis_instance.llen 'failed_jobs'
end
|
#finished_jobs_count ⇒ Object
171
172
173
|
# File 'lib/worker_army/queue.rb', line 171
def finished_jobs_count
Queue.redis_instance.llen 'jobs'
end
|
#get_job_count(queue_prefix = "queue") ⇒ Object
175
176
177
|
# File 'lib/worker_army/queue.rb', line 175
def get_job_count(queue_prefix = "queue")
Queue.redis_instance["#{queue_prefix}_counter"].to_i
end
|
#get_known_queues ⇒ Object
167
168
169
|
# File 'lib/worker_army/queue.rb', line 167
def get_known_queues
Queue.redis_instance.smembers 'known_queues'
end
|
#get_known_workers(recent_worker_pings = 1000) ⇒ Object
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
|
# File 'lib/worker_army/queue.rb', line 149
def get_known_workers(recent_worker_pings = 1000)
worker_pings = Queue.redis_instance.lrange 'workers', 0, recent_worker_pings
return [] unless worker_pings
worker_pings = worker_pings.collect {|json| JSON.parse(json)}.sort_by {|h| h['timestamp'].to_i}.reverse
uniq_workers = worker_pings.collect {|h| [h['host_name'], h['worker_pid']]}.uniq
workers = []
uniq_workers.each do |worker_pair|
worker_pings.each do |hash|
if hash['host_name'] == worker_pair[0] and hash['worker_pid'] == worker_pair[1]
workers << hash
break
end
end
end
now = Time.now.utc.to_i
workers.select {|h| h if now - h['timestamp'].to_i < 3600}
end
|
#job_data(job_id) ⇒ Object
112
113
114
|
# File 'lib/worker_army/queue.rb', line 112
def job_data(job_id)
Queue.redis_instance["job_#{job_id}"]
end
|
#last_ping ⇒ Object
145
146
147
|
# File 'lib/worker_army/queue.rb', line 145
def last_ping
Queue.redis_instance.get 'last_ping'
end
|
#pop(job_class_name, queue_prefix = "queue") ⇒ Object
51
52
53
54
|
# File 'lib/worker_army/queue.rb', line 51
def pop(job_class_name, queue_prefix = "queue")
raise "No redis connection!" unless Queue.redis_instance
return Queue.redis_instance.blpop("#{queue_prefix}_#{job_class_name}")
end
|
#push(data, queue_prefix = "queue") ⇒ Object
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
# File 'lib/worker_army/queue.rb', line 33
def push(data, queue_prefix = "queue")
if Queue.redis_instance and data
job_count = Queue.redis_instance.incr("#{queue_prefix}_counter")
queue_prefix = queue_prefix if queue_prefix
queue_prefix = data['queue_prefix'] if data['queue_prefix']
queue_name = "#{queue_prefix}_#{data['job_class']}"
Queue.redis_instance.sadd 'known_queues', queue_name
queue_count = Queue.redis_instance.incr("#{queue_name}_counter")
job_id = SecureRandom.uuid
Queue.redis_instance.rpush queue_name, data.merge(job_count: job_count,
queue_count: queue_count, job_id: job_id, queue_name: queue_name).to_json
end
raise "No data" unless data
raise "No redis connection!" unless Queue.redis_instance
{ job_count: job_count, job_id: job_id, queue_count: queue_count,
queue_name: queue_name }
end
|
#remove_current_job(job_id) ⇒ Object
96
97
98
|
# File 'lib/worker_army/queue.rb', line 96
def remove_current_job(job_id)
Queue.redis_instance.srem 'current_jobs', job_id
end
|
#save_result(data) ⇒ Object
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
# File 'lib/worker_army/queue.rb', line 56
def save_result(data)
if data
job_id = data['job_id']
callback_url = data['callback_url']
if @config['store_job_data']
Queue.redis_instance["job_#{job_id}"] = data.to_json
end
Queue.redis_instance.lpush 'jobs', job_id
if callback_url
data.delete("callback_url")
deliver_callback(job_id, callback_url, data)
end
end
end
|