Class: Gandalf::Worker

Inherits:
Object
  • Object
show all
Includes:
DataMapper::Resource
Defined in:
lib/gandalf/worker.rb

Overview

A magical slave

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.handle_exception(job, exception) ⇒ Object



34
35
36
37
# File 'lib/gandalf/worker.rb', line 34

def @crawl_scheduler.handle_exception(job, exception)
  puts exception
  raise exception
end

Instance Method Details

#crawl(jobs) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
# File 'lib/gandalf/worker.rb', line 47

def crawl(jobs)
  urls = jobs.keys
  feeds = Feedzirra::Feed.fetch_and_parse(urls)
  jobs.each do |url, job|
    if feeds[url].is_a? Feedzirra::Parser::RSS 
      save_posts(feeds[url], job[:id])
    else
      handle_error(job)
    end
  end
end

#handle_error(job) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/gandalf/worker.rb', line 72

def handle_error(job)
  if job[:errors].is_a? Fixnum
    job[:errors] += 1
  else
    job[:errors] = 1
  end

  if job[:errors] >= max_errors
    puts job
  else
    @queue.push(job)
  end
end

#jobs_to_doObject



86
87
88
# File 'lib/gandalf/worker.rb', line 86

def jobs_to_do
  @queue.length
end

#new_jobs(count) ⇒ Object



96
97
98
99
100
101
102
103
# File 'lib/gandalf/worker.rb', line 96

def new_jobs(count)
  jobs = @queue.pop_first(count)
  hash = {}
  jobs.each do |job|
    hash[job[:url]] = job
  end
  hash
end

#push(jobs) ⇒ Object



90
91
92
93
94
# File 'lib/gandalf/worker.rb', line 90

def push(jobs)
  jobs.each do |job|
    @queue.push(job)
  end
end

#runObject



26
27
28
29
30
31
32
# File 'lib/gandalf/worker.rb', line 26

def run
  @crawl_scheduler = Rufus::Scheduler.start_new unless @crawl_scheduler
  @crawl_scheduler.every interval do
    crawl new_jobs(max_jobs) if jobs_to_do > 0
  end

end

#save_posts(feed, channel_id) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/gandalf/worker.rb', line 59

def save_posts(feed, channel_id)
  posts = @Post.parse(feed)
  posts.each do |p|
    p.channel_id = channel_id
    p.clean!
    begin
      p.save
    rescue MysqlError => err
      break
    end
  end
end

#setup(options = {}) ⇒ Object



15
16
17
18
19
20
21
22
23
24
# File 'lib/gandalf/worker.rb', line 15

def setup(options = {})
  @queue = RedisQueue.new(:key => self.id, 
                          :redis => options[:redis],
                          :host => self.scheduler.redis_host)
  if options[:post_class]
    @Post = options[:post_class]
  else
    @Post = Post
  end
end

#stopObject



39
40
41
# File 'lib/gandalf/worker.rb', line 39

def stop
  @crawl_scheduler.stop
end