Class: Resque::Worker

Inherits:
Object
  • Object
show all
Includes:
Plugins::MultiJobForks::RssReader
Defined in:
lib/resque-multi-job-forks.rb

Constant Summary

Constants included from Plugins::MultiJobForks::RssReader

Plugins::MultiJobForks::RssReader::LINUX, Plugins::MultiJobForks::RssReader::PS_CMD, Plugins::MultiJobForks::RssReader::UNITS

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Plugins::MultiJobForks::RssReader

#rss, #rss_linux, #rss_posix

Instance Attribute Details

#jobs_per_forkObject

Returns the value of attribute jobs_per_fork.



9
10
11
# File 'lib/resque-multi-job-forks.rb', line 9

def jobs_per_fork
  @jobs_per_fork
end

#jobs_processedObject (readonly)

Returns the value of attribute jobs_processed.



11
12
13
# File 'lib/resque-multi-job-forks.rb', line 11

def jobs_processed
  @jobs_processed
end

#memory_thresholdObject

Returns the value of attribute memory_threshold.



10
11
12
# File 'lib/resque-multi-job-forks.rb', line 10

def memory_threshold
  @memory_threshold
end

#seconds_per_forkObject

Returns the value of attribute seconds_per_fork.



8
9
10
# File 'lib/resque-multi-job-forks.rb', line 8

def seconds_per_fork
  @seconds_per_fork
end

Class Method Details

.multi_jobs_per_fork?Boolean

Returns:

  • (Boolean)


13
14
15
# File 'lib/resque-multi-job-forks.rb', line 13

def self.multi_jobs_per_fork?
  ENV["DISABLE_MULTI_JOBS_PER_FORK"].nil?
end

Instance Method Details

#fork_hijacked?Boolean

Returns:

  • (Boolean)


79
80
81
# File 'lib/resque-multi-job-forks.rb', line 79

def fork_hijacked?
  @release_fork_limit
end

#fork_job_limitObject



102
103
104
# File 'lib/resque-multi-job-forks.rb', line 102

def fork_job_limit
  jobs_per_fork.nil? ? Time.now.to_i + seconds_per_fork : jobs_per_fork
end

#fork_job_limit_reached?Boolean

Returns:

  • (Boolean)


106
107
108
# File 'lib/resque-multi-job-forks.rb', line 106

def fork_job_limit_reached?
  fork_job_limit_remaining <= 0 || fork_job_over_memory_threshold?
end

#fork_job_limit_remainingObject



110
111
112
# File 'lib/resque-multi-job-forks.rb', line 110

def fork_job_limit_remaining
  jobs_per_fork.nil? ? @release_fork_limit - Time.now.to_i : jobs_per_fork - @jobs_processed
end

#fork_job_over_memory_threshold?Boolean

Returns:

  • (Boolean)


126
127
128
# File 'lib/resque-multi-job-forks.rb', line 126

def fork_job_over_memory_threshold?
  !!(memory_threshold && rss > memory_threshold)
end

#hijack_forkObject



83
84
85
86
87
88
89
90
91
# File 'lib/resque-multi-job-forks.rb', line 83

def hijack_fork
  log 'hijack fork.'
  @suppressed_fork_hooks = [Resque.after_fork, Resque.before_fork]
  Resque.after_fork = Resque.before_fork = nil
  @release_fork_limit = fork_job_limit
  @jobs_processed = 0
  @cant_fork = true
  trap('TSTP') { shutdown }
end

#is_parent_process?Boolean

Returns:

  • (Boolean)


75
76
77
# File 'lib/resque-multi-job-forks.rb', line 75

def is_parent_process?
  @child
end

#minutes_per_forkObject



118
119
120
# File 'lib/resque-multi-job-forks.rb', line 118

def minutes_per_fork
  ENV['MINUTES_PER_FORK'].nil? ? 1 : ENV['MINUTES_PER_FORK'].to_i
end

#pause_processing_with_multi_job_forksObject Also known as: pause_processing



40
41
42
43
# File 'lib/resque-multi-job-forks.rb', line 40

def pause_processing_with_multi_job_forks
  pause_processing_without_multi_job_forks
  shutdown_child if is_parent_process?
end

#perform_with_multi_job_forks(job = nil) ⇒ Object Also known as: perform



18
19
20
21
22
# File 'lib/resque-multi-job-forks.rb', line 18

def perform_with_multi_job_forks(job = nil)
  perform_without_multi_job_forks(job)
  hijack_fork unless fork_hijacked?
  @jobs_processed += 1
end

#reconnect_with_multi_job_forksObject Also known as: reconnect

Reconnect only once



55
56
57
58
59
60
# File 'lib/resque-multi-job-forks.rb', line 55

def reconnect_with_multi_job_forks
  unless @reconnected
    reconnect_without_multi_job_forks
    @reconnected = true
  end
end

#release_forkObject



93
94
95
96
97
98
99
100
# File 'lib/resque-multi-job-forks.rb', line 93

def release_fork
  log "jobs processed by child: #{jobs_processed}; rss: #{rss}"
  run_hook :before_child_exit, self
  Resque.after_fork, Resque.before_fork = *@suppressed_fork_hooks
  @release_fork_limit = @jobs_processed = @cant_fork = nil
  log 'hijack over, counter terrorists win.'
  @shutdown = true unless $TESTING
end

#shutdown_childObject

Need to tell the child to shutdown since it might be looping performing multiple jobs per fork The TSTP signal is registered only in forked processes and calls this function



67
68
69
70
71
72
73
# File 'lib/resque-multi-job-forks.rb', line 67

def shutdown_child
  begin
    Process.kill('TSTP', @child)
  rescue Errno::ESRCH
    nil
  end
end

#shutdown_with_multi_job_forksObject Also known as: shutdown



33
34
35
36
# File 'lib/resque-multi-job-forks.rb', line 33

def shutdown_with_multi_job_forks
  shutdown_without_multi_job_forks
  shutdown_child if is_parent_process?
end

#shutdown_with_multi_job_forks?Boolean Also known as: shutdown?

Returns:

  • (Boolean)


26
27
28
29
# File 'lib/resque-multi-job-forks.rb', line 26

def shutdown_with_multi_job_forks?
  release_fork if fork_hijacked? && (fork_job_limit_reached? || @shutdown)
  shutdown_without_multi_job_forks?
end

#working_on_with_worker_registration(job) ⇒ Object Also known as: working_on



47
48
49
50
# File 'lib/resque-multi-job-forks.rb', line 47

def working_on_with_worker_registration(job)
  register_worker
  working_on_without_worker_registration(job)
end