Class: Delayed::HomeManager

Inherits:
Object
  • Object
show all
Includes:
Celluloid
Defined in:
lib/delayed/home_manager.rb

Constant Summary collapse

DEFAULT_SLEEP_TIME =
2
DEFAULT_JOBS_TO_PULL =
20
DEFAULT_WORKERS_NUMBER =
16
DEFAULT_TIMEOUT =
20
DEFAULT_MAX_ATTEMPTS =
25

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = nil) ⇒ HomeManager

Returns a new instance of HomeManager.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/delayed/home_manager.rb', line 17

def initialize(options=nil)
  self.alive = true

  if (options && options.class == Hash)
    @sleep_time = options[:sleep_time]
    @workers_number = options[:workers_number]
    @worker_options = options[:worker_options]
    @queue = options[:queue]
    @timeout = options[:timeout]
    @max_attempts = options[:max_attempts]
  end

  @sleep_time ||= DEFAULT_SLEEP_TIME
  @timeout ||= DEFAULT_TIMEOUT
  @workers_number ||= DEFAULT_WORKERS_NUMBER
  @max_attempts ||= DEFAULT_MAX_ATTEMPTS
  @worker_options ||= {}

  # if @queue is nil, we'll pull from all queues
end

Instance Attribute Details

#aliveObject

Returns the value of attribute alive.



15
16
17
# File 'lib/delayed/home_manager.rb', line 15

def alive
  @alive
end

#hostnameObject

Returns the value of attribute hostname.



15
16
17
# File 'lib/delayed/home_manager.rb', line 15

def hostname
  @hostname
end

#max_attemptsObject

Returns the value of attribute max_attempts.



15
16
17
# File 'lib/delayed/home_manager.rb', line 15

def max_attempts
  @max_attempts
end

#queueObject

Returns the value of attribute queue.



15
16
17
# File 'lib/delayed/home_manager.rb', line 15

def queue
  @queue
end

#sleep_timeObject

Returns the value of attribute sleep_time.



15
16
17
# File 'lib/delayed/home_manager.rb', line 15

def sleep_time
  @sleep_time
end

#timeoutObject

Returns the value of attribute timeout.



15
16
17
# File 'lib/delayed/home_manager.rb', line 15

def timeout
  @timeout
end

#timerObject

Returns the value of attribute timer.



15
16
17
# File 'lib/delayed/home_manager.rb', line 15

def timer
  @timer
end

#worker_optionsObject

Returns the value of attribute worker_options.



15
16
17
# File 'lib/delayed/home_manager.rb', line 15

def worker_options
  @worker_options
end

#workers_numberObject

Returns the value of attribute workers_number.



15
16
17
# File 'lib/delayed/home_manager.rb', line 15

def workers_number
  @workers_number
end

#workers_poolObject

Returns the value of attribute workers_pool.



15
16
17
# File 'lib/delayed/home_manager.rb', line 15

def workers_pool
  @workers_pool
end

Instance Method Details

#killObject



105
106
107
# File 'lib/delayed/home_manager.rb', line 105

def kill
  @alive = false
end

#pull_next(queue = nil, n = 15) ⇒ Object

pull n items from Delayed::Job locks jobs until they’re processed (the worker then deletes the job)



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/delayed/home_manager.rb', line 79

def pull_next(queue=nil, n=15)
  ids = []
  Delayed::Job.transaction do
    query = Delayed::Job.where('(run_at is null or run_at < ?) and locked_at is null', DateTime.now).order('priority asc, run_at asc, id asc')
    if (queue && queue != 'default')
      query = query.where(:queue => queue)
    # if the queue is 'default' or nil, this would be the "default queue"
    elsif queue == 'default'
      query = query.where(:queue => ['default', nil])
    end
    # if no queue name is provided, the "queue" column would be ignored
    # therefore returning the list of all jobs in all queues

    query = query.limit(n)
    ids = query.pluck(:id)
    query.update_all(:locked_at => DateTime.now.utc, :locked_by => hostname)
  end

  return Delayed::Job.where(:id => ids)
end

#startObject



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/delayed/home_manager.rb', line 38

def start
  # use Celluloid to create a pool of size @workers_number
  # worker_options get passed to HomeWorker's initializer
  @workers_pool = HomeWorker.pool(:size => @workers_number, :args => @worker_options)

  begin
    # make sure jobs locked by our hostname in prior attempts are unlocked
    unlock_all
  rescue
    DelayedJobActiveRecordThreaded.logger.error($!.message) if DelayedJobActiveRecordThreaded.logger
    DelayedJobActiveRecordThreaded.logger.error($!.backtrace.join("\n")) if DelayedJobActiveRecordThreaded.logger
  end

  @timer = every(@sleep_time) {
    begin
      if (!@alive)
        @timer.cancel
      else
        jobs = pull_next(@queue, @workers_pool.idle_size)
        jobs.each { |j| @workers_pool.async.work(j) }
      end
    rescue
      # logging error watch
      begin
        DelayedJobActiveRecordThreaded.logger.error($!.message) if DelayedJobActiveRecordThreaded.logger
        DelayedJobActiveRecordThreaded.logger.error($!.backtrace.join("\n")) if DelayedJobActiveRecordThreaded.logger
      rescue
      end
    end
  }
end

#unlock_allObject

Unlock all jobs locked by our hostname in prior attempts



71
72
73
74
75
# File 'lib/delayed/home_manager.rb', line 71

def unlock_all
  Delayed::Job.transaction do
    Delayed::Job.where(:locked_by => hostname).update_all(:locked_by => nil, :locked_at => nil)
  end
end