18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
# File 'lib/cobweb_crawl_helper.rb', line 18
def destroy
options = @data
options[:queue_name] = "cobweb_crawl_job" unless options.has_key?(:queue_name)
if RESQUE_INSTALLED
options[:processing_queue] = "CobwebJob" unless options.has_key?(:processing_queue)
options[:crawl_finished_queue] = "CobwebFinishedJob" unless options.has_key?(:crawl_finished_queue)
end
if SIDEKIQ_INSTALLED
options[:processing_queue] = "CrawlWorker" unless options.has_key?(:processing_queue)
options[:crawl_finished_queue] = "CrawlFinishedWorker" unless options.has_key?(:crawl_finished_queue)
end
self.statistics.end_crawl(@data, true)
counter = 0
while(counter < 200) do
break if self.statistics.get_status == CANCELLED
sleep 1
counter += 1
end
if options[:queue_system] == :resque && RESQUE_INSTALLED
position = Resque.size(options[:queue_name])
until position == 0
position-=BATCH_SIZE
position = 0 if position < 0
job_items = Resque.peek(options[:queue_name], position, BATCH_SIZE)
job_items.each do |item|
if item["args"][0]["crawl_id"] == id
Resque.dequeue(CrawlJob, item["args"][0])
end
end
end
end
if options[:queue_system] == :sidekiq && SIDEKIQ_INSTALLED
queue = Sidekiq::Queue.new("crawl_worker")
queue.each do |job|
job.delete if job.args[0]["crawl_id"] == id
end
process_queue_name = Kernel.const_get(options[:processing_queue]).sidekiq_options_hash["queue"]
queue = Sidekiq::Queue.new(process_queue_name)
queue.each do |job|
job.delete if job.args[0]["crawl_id"] == id
end
end
if options[:crawl_finished_queue] && options[:queue_system] == :resque && RESQUE_INSTALLED
additional_stats = {:crawl_id => id, :crawled_base_url => @stats.redis.get("crawled_base_url")}
additional_stats[:redis_options] = @data[:redis_options] unless @data[:redis_options] == {}
additional_stats[:source_id] = options[:source_id] unless options[:source_id].nil?
Resque.enqueue(options[:crawl_finished_queue], @stats.get_statistics.merge(additional_stats))
end
if options[:crawl_finished_queue] && options[:queue_system] == :sidekiq && SIDEKIQ_INSTALLED
additional_stats = {:crawl_id => id, :crawled_base_url => @stats.redis.get("crawled_base_url")}
additional_stats[:redis_options] = @data[:redis_options] unless @data[:redis_options] == {}
additional_stats[:source_id] = options[:source_id] unless options[:source_id].nil?
Kernel.const_get(options[:crawl_finished_queue]).perform_async(@stats.get_statistics.merge(additional_stats))
end
end
|