Class: Resque::Worker

Inherits:
Object
  • Object
show all
Includes:
Plugins::MultiJobForks::RssReader
Defined in:
lib/resque-multi-job-forks.rb

Constant Summary

Constants included from Plugins::MultiJobForks::RssReader

Plugins::MultiJobForks::RssReader::LINUX, Plugins::MultiJobForks::RssReader::PS_CMD, Plugins::MultiJobForks::RssReader::UNITS

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Plugins::MultiJobForks::RssReader

#rss, #rss_linux, #rss_posix

Instance Attribute Details

#jobs_per_forkObject

Returns the value of attribute jobs_per_fork.



9
10
11
# File 'lib/resque-multi-job-forks.rb', line 9

def jobs_per_fork
  @jobs_per_fork
end

#jobs_processedObject (readonly)

Returns the value of attribute jobs_processed.



11
12
13
# File 'lib/resque-multi-job-forks.rb', line 11

def jobs_processed
  @jobs_processed
end

#memory_thresholdObject

Returns the value of attribute memory_threshold.



10
11
12
# File 'lib/resque-multi-job-forks.rb', line 10

def memory_threshold
  @memory_threshold
end

#seconds_per_forkObject

Returns the value of attribute seconds_per_fork.



8
9
10
# File 'lib/resque-multi-job-forks.rb', line 8

def seconds_per_fork
  @seconds_per_fork
end

Class Method Details

.multi_jobs_per_fork?Boolean

Returns:

  • (Boolean)


13
14
15
# File 'lib/resque-multi-job-forks.rb', line 13

def self.multi_jobs_per_fork?
  ENV["DISABLE_MULTI_JOBS_PER_FORK"].nil?
end

Instance Method Details

#fork(&block) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/resque-multi-job-forks.rb', line 19

def fork(&block)
  if child = Kernel.fork
    return child
  else
    if term_child
      unregister_signal_handlers
      trap('QUIT') { shutdown }
    end
    raise NotImplementedError, "Pretending to not have forked"
    # perform_with_fork will run the job and continue working...
  end
end

#fork_hijacked?Boolean

Returns:

  • (Boolean)


110
111
112
# File 'lib/resque-multi-job-forks.rb', line 110

def fork_hijacked?
  @release_fork_limit
end

#fork_job_limitObject



132
133
134
# File 'lib/resque-multi-job-forks.rb', line 132

def fork_job_limit
  jobs_per_fork.nil? ? Time.now.to_f + seconds_per_fork : jobs_per_fork
end

#fork_job_limit_reached?Boolean

Returns:

  • (Boolean)


136
137
138
# File 'lib/resque-multi-job-forks.rb', line 136

def fork_job_limit_reached?
  fork_job_limit_remaining <= 0 || fork_job_over_memory_threshold?
end

#fork_job_limit_remainingObject



140
141
142
# File 'lib/resque-multi-job-forks.rb', line 140

def fork_job_limit_remaining
  jobs_per_fork.nil? ? @release_fork_limit - Time.now.to_f : jobs_per_fork - @jobs_processed
end

#fork_job_over_memory_threshold?Boolean

Returns:

  • (Boolean)


156
157
158
# File 'lib/resque-multi-job-forks.rb', line 156

def fork_job_over_memory_threshold?
  !!(memory_threshold && rss > memory_threshold)
end

#hijack_forkObject



114
115
116
117
118
119
120
121
# File 'lib/resque-multi-job-forks.rb', line 114

def hijack_fork
  log_with_severity :debug, 'hijack fork.'
  @suppressed_fork_hooks = [Resque.after_fork, Resque.before_fork]
  Resque.after_fork = Resque.before_fork = nil
  @release_fork_limit = fork_job_limit
  @jobs_processed = 0
  @fork_per_job = false
end

#is_parent_process?Boolean

Returns:

  • (Boolean)


101
102
103
# File 'lib/resque-multi-job-forks.rb', line 101

def is_parent_process?
  @child || @pid == Process.pid
end

#minutes_per_forkObject



148
149
150
# File 'lib/resque-multi-job-forks.rb', line 148

def minutes_per_fork
  ENV['MINUTES_PER_FORK'].nil? ? 1 : ENV['MINUTES_PER_FORK'].to_i
end

#pause_processing_with_multi_job_forksObject Also known as: pause_processing



63
64
65
66
# File 'lib/resque-multi-job-forks.rb', line 63

def pause_processing_with_multi_job_forks
  shutdown_child
  pause_processing_without_multi_job_forks
end

#perform_with_multi_job_forks(job = nil) ⇒ Object Also known as: perform



40
41
42
43
44
45
# File 'lib/resque-multi-job-forks.rb', line 40

def perform_with_multi_job_forks(job = nil)
  @fork_per_job = true unless fork_hijacked? # reconnect and after_fork
  perform_without_multi_job_forks(job)
  hijack_fork unless fork_hijacked?
  @jobs_processed += 1
end

#reconnect_with_multi_job_forksObject Also known as: reconnect

Reconnect only once



78
79
80
81
82
83
# File 'lib/resque-multi-job-forks.rb', line 78

def reconnect_with_multi_job_forks
  unless @reconnected
    reconnect_without_multi_job_forks
    @reconnected = true
  end
end

#release_and_exit!Object



105
106
107
108
# File 'lib/resque-multi-job-forks.rb', line 105

def release_and_exit!
  release_fork if fork_hijacked?
  run_at_exit_hooks ? exit : exit!(true)
end

#release_forkObject



123
124
125
126
127
128
129
130
# File 'lib/resque-multi-job-forks.rb', line 123

def release_fork
  log_with_severity :info, "jobs processed by child: #{jobs_processed}; rss: #{rss}"
  run_hook :before_child_exit, self
  Resque.after_fork, Resque.before_fork = *@suppressed_fork_hooks
  @release_fork_limit = @jobs_processed = nil
  log_with_severity :debug, 'hijack over, counter terrorists win.'
  @shutdown = true
end

#shutdown_childObject

Need to tell the child to shutdown since it might be looping performing multiple jobs per fork. The QUIT signal normally does a graceful shutdown, and is re-registered in children (term_child normally unregisters it).



91
92
93
94
95
96
97
98
99
# File 'lib/resque-multi-job-forks.rb', line 91

def shutdown_child
  return unless @child
  begin
    log_with_severity :debug, "multi_jobs_per_fork: Sending QUIT signal to #{@child}"
    Process.kill('QUIT', @child)
  rescue Errno::ESRCH
    nil
  end
end

#shutdown_with_multi_job_forksObject Also known as: shutdown



56
57
58
59
# File 'lib/resque-multi-job-forks.rb', line 56

def shutdown_with_multi_job_forks
  shutdown_child
  shutdown_without_multi_job_forks
end

#shutdown_with_multi_job_forks?Boolean Also known as: shutdown?

Returns:

  • (Boolean)


49
50
51
52
# File 'lib/resque-multi-job-forks.rb', line 49

def shutdown_with_multi_job_forks?
  release_fork if fork_hijacked? && (fork_job_limit_reached? || @shutdown)
  shutdown_without_multi_job_forks?
end

#work_with_multi_job_forks(*args) ⇒ Object Also known as: work



32
33
34
35
36
# File 'lib/resque-multi-job-forks.rb', line 32

def work_with_multi_job_forks(*args)
  pid # forces @pid to be set in the parent
  work_without_multi_job_forks(*args)
  release_and_exit! unless is_parent_process?
end

#working_on_with_worker_registration(job) ⇒ Object Also known as: working_on



70
71
72
73
# File 'lib/resque-multi-job-forks.rb', line 70

def working_on_with_worker_registration(job)
  register_worker
  working_on_without_worker_registration(job)
end