Module: ProcessBalancer
- Defined in:
- lib/process_balancer.rb,
lib/process_balancer/cli.rb,
lib/process_balancer/base.rb,
lib/process_balancer/util.rb,
lib/process_balancer/rails.rb,
lib/process_balancer/worker.rb,
lib/process_balancer/manager.rb,
lib/process_balancer/version.rb,
lib/process_balancer/watcher.rb,
lib/process_balancer/redis_connection.rb,
lib/process_balancer/lock/simple_redis.rb,
lib/process_balancer/lock/advisory_lock.rb,
lib/process_balancer/private/cancellation.rb
Overview
This is a private copy of Concurrent::Cancellation from concurrent-ruby-edge so we do not depend on the edge gem
Defined Under Namespace
Modules: Lock, Private, RedisConnection, Util
Classes: Base, CLI, Error, Manager, Rails, Watcher, Worker
Constant Summary
collapse
- PROCESSES_KEY =
'processes'
- WORKER_COUNT_KEY =
'worker_counts'
- DEFAULTS =
{
redis: {},
job_sets: [],
require: '.',
max_threads: 10,
shutdown_timeout: 30,
reloader: proc { |&block| block.call },
}.freeze
- VERSION =
'1.0.0'
Class Method Summary
collapse
Class Method Details
.adjust_scheduled_workers(job_id, by: nil, to: nil) ⇒ Object
102
103
104
105
106
107
108
109
110
|
# File 'lib/process_balancer.rb', line 102
def self.adjust_scheduled_workers(job_id, by: nil, to: nil)
if !to.nil?
redis { |c| c.hset(WORKER_COUNT_KEY, job_id.to_s, to) }
elsif !by.nil?
redis { |c| c.hincrby(WORKER_COUNT_KEY, job_id.to_s, by) }
else
raise ArgumentError, 'Must specify either by: (an increment/decrement) or to: (an exact value)'
end
end
|
Configuration for ProcessBalancer, use like:
ProcessBalancer.configure do |config|
config.redis = { :namespace => 'myapp', :size => 25, :url => 'redis://myhost:8877/0' }
if config.server?
end
end
43
44
45
|
# File 'lib/process_balancer.rb', line 43
def self.configure
yield self
end
|
.hostname ⇒ Object
90
91
92
|
# File 'lib/process_balancer.rb', line 90
def self.hostname
ENV['DYNO'] || Socket.gethostname
end
|
.identity ⇒ Object
98
99
100
|
# File 'lib/process_balancer.rb', line 98
def self.identity
@identity ||= "#{hostname}:#{$PID}:#{process_nonce}"
end
|
.logger ⇒ Object
51
52
53
|
# File 'lib/process_balancer.rb', line 51
def self.logger
@logger ||= Logger.new(STDOUT, level: Logger::INFO)
end
|
.options ⇒ Object
26
27
28
|
# File 'lib/process_balancer.rb', line 26
def self.options
@options ||= DEFAULTS.dup
end
|
.options=(opts) ⇒ Object
30
31
32
|
# File 'lib/process_balancer.rb', line 30
def self.options=(opts)
@options = opts
end
|
.process_nonce ⇒ Object
94
95
96
|
# File 'lib/process_balancer.rb', line 94
def self.process_nonce
@process_nonce ||= SecureRandom.hex(6)
end
|
.redis ⇒ Object
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
# File 'lib/process_balancer.rb', line 55
def self.redis
raise ArgumentError, 'requires a block' unless block_given?
redis_pool.with do |conn|
retryable = true
begin
yield conn
rescue Redis::CommandError => e
(conn.disconnect!; retryable = false; retry) if retryable && e.message =~ /READONLY/
raise
end
end
end
|
.redis=(hash) ⇒ Object
74
75
76
77
78
79
80
|
# File 'lib/process_balancer.rb', line 74
def self.redis=(hash)
@redis_pool = if hash.is_a?(ConnectionPool)
hash
else
RedisConnection.create(hash)
end
end
|
.redis_pool ⇒ Object
70
71
72
|
# File 'lib/process_balancer.rb', line 70
def self.redis_pool
@redis_pool ||= RedisConnection.create(options[:redis])
end
|
.reset ⇒ Object
82
83
84
85
86
87
88
|
# File 'lib/process_balancer.rb', line 82
def self.reset
@redis_pool = nil
@options = nil
@logger = nil
@process_nonce = nil
@identity = nil
end
|
.running_workers(job_id) ⇒ Object
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
# File 'lib/process_balancer.rb', line 117
def self.running_workers(job_id)
count = 0
redis do |c|
workers = c.lrange(PROCESSES_KEY, 0, -1)
workers.each do |worker|
data = c.hget("#{worker}:workers", job_id)
if data
data = JSON.parse(data, symbolize_names: true)
count += (data.dig(:running)&.size || 0)
end
rescue JSON::ParserError
nil
end
end
count
end
|
.scheduled_workers(job_id) ⇒ Object
112
113
114
115
|
# File 'lib/process_balancer.rb', line 112
def self.scheduled_workers(job_id)
value = redis { |c| c.hget(WORKER_COUNT_KEY, job_id.to_s) }&.to_i
value.nil? ? 1 : value
end
|
.server? ⇒ Boolean
47
48
49
|
# File 'lib/process_balancer.rb', line 47
def self.server?
defined?(ProcessBalancer::CLI)
end
|