Module: ProcessBalancer

Defined in:
lib/process_balancer.rb,
lib/process_balancer/cli.rb,
lib/process_balancer/base.rb,
lib/process_balancer/util.rb,
lib/process_balancer/rails.rb,
lib/process_balancer/worker.rb,
lib/process_balancer/manager.rb,
lib/process_balancer/version.rb,
lib/process_balancer/watcher.rb,
lib/process_balancer/redis_connection.rb,
lib/process_balancer/lock/simple_redis.rb,
lib/process_balancer/lock/advisory_lock.rb,
lib/process_balancer/private/cancellation.rb

Overview

This is a private copy of Concurrent::Cancellation from concurrent-ruby-edge so we do not depend on the edge gem

Defined Under Namespace

Modules: Lock, Private, RedisConnection, Util Classes: Base, CLI, Error, Manager, Rails, Watcher, Worker

Constant Summary collapse

PROCESSES_KEY =
'processes'
WORKER_COUNT_KEY =
'worker_counts'
DEFAULTS =
{
    redis:            {},
    job_sets:         [],
    require:          '.',
    max_threads:      10,
    shutdown_timeout: 30,
    reloader:         proc { |&block| block.call },
}.freeze
VERSION =
'1.0.0'

Class Method Summary collapse

Class Method Details

.adjust_scheduled_workers(job_id, by: nil, to: nil) ⇒ Object



102
103
104
105
106
107
108
109
110
# File 'lib/process_balancer.rb', line 102

def self.adjust_scheduled_workers(job_id, by: nil, to: nil)
  if !to.nil?
    redis { |c| c.hset(WORKER_COUNT_KEY, job_id.to_s, to) }
  elsif !by.nil?
    redis { |c| c.hincrby(WORKER_COUNT_KEY, job_id.to_s, by) }
  else
    raise ArgumentError, 'Must specify either by: (an increment/decrement) or to: (an exact value)'
  end
end

.configure {|_self| ... } ⇒ Object

Configuration for ProcessBalancer, use like:

ProcessBalancer.configure do |config|
  config.redis = { :namespace => 'myapp', :size => 25, :url => 'redis://myhost:8877/0' }
  if config.server?
   # any configuration specific to server
  end
end

Yields:

  • (_self)

Yield Parameters:



43
44
45
# File 'lib/process_balancer.rb', line 43

def self.configure
  yield self
end

.hostnameObject



90
91
92
# File 'lib/process_balancer.rb', line 90

def self.hostname
  ENV['DYNO'] || Socket.gethostname
end

.identityObject



98
99
100
# File 'lib/process_balancer.rb', line 98

def self.identity
  @identity ||= "#{hostname}:#{$PID}:#{process_nonce}"
end

.loggerObject



51
52
53
# File 'lib/process_balancer.rb', line 51

def self.logger
  @logger ||= Logger.new(STDOUT, level: Logger::INFO)
end

.optionsObject



26
27
28
# File 'lib/process_balancer.rb', line 26

def self.options
  @options ||= DEFAULTS.dup
end

.options=(opts) ⇒ Object



30
31
32
# File 'lib/process_balancer.rb', line 30

def self.options=(opts)
  @options = opts
end

.process_nonceObject



94
95
96
# File 'lib/process_balancer.rb', line 94

def self.process_nonce
  @process_nonce ||= SecureRandom.hex(6)
end

.redisObject

Raises:

  • (ArgumentError)


55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/process_balancer.rb', line 55

def self.redis
  raise ArgumentError, 'requires a block' unless block_given?

  redis_pool.with do |conn|
    retryable = true
    begin
      yield conn
    rescue Redis::CommandError => e
      # if we are on a slave, disconnect and reopen to get back on the master
      (conn.disconnect!; retryable = false; retry) if retryable && e.message =~ /READONLY/
      raise
    end
  end
end

.redis=(hash) ⇒ Object



74
75
76
77
78
79
80
# File 'lib/process_balancer.rb', line 74

def self.redis=(hash)
  @redis_pool = if hash.is_a?(ConnectionPool)
                  hash
                else
                  RedisConnection.create(hash)
                end
end

.redis_poolObject



70
71
72
# File 'lib/process_balancer.rb', line 70

def self.redis_pool
  @redis_pool ||= RedisConnection.create(options[:redis])
end

.resetObject



82
83
84
85
86
87
88
# File 'lib/process_balancer.rb', line 82

def self.reset
  @redis_pool    = nil
  @options       = nil
  @logger        = nil
  @process_nonce = nil
  @identity      = nil
end

.running_workers(job_id) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/process_balancer.rb', line 117

def self.running_workers(job_id)
  count = 0

  redis do |c|
    workers = c.lrange(PROCESSES_KEY, 0, -1)

    workers.each do |worker|
      data = c.hget("#{worker}:workers", job_id)
      if data
        data = JSON.parse(data, symbolize_names: true)
        count += (data.dig(:running)&.size || 0)
      end
    rescue JSON::ParserError
      nil
    end
  end

  count
end

.scheduled_workers(job_id) ⇒ Object



112
113
114
115
# File 'lib/process_balancer.rb', line 112

def self.scheduled_workers(job_id)
  value = redis { |c| c.hget(WORKER_COUNT_KEY, job_id.to_s) }&.to_i
  value.nil? ? 1 : value
end

.server?Boolean

Returns:

  • (Boolean)


47
48
49
# File 'lib/process_balancer.rb', line 47

def self.server?
  defined?(ProcessBalancer::CLI)
end