Class: Restrainer
- Inherits:
-
Object
- Object
- Restrainer
- Defined in:
- lib/restrainer.rb
Overview
Redis backed throttling mechanism to ensure that only a limited number of processes can be executed at any one time.
Usage:
Restrainer.new(:foo, 10).throttle do
# Do something
end
If more than the specified number of processes as identified by the name argument is currently running, then the throttle block will raise an error.
Defined Under Namespace
Classes: ThrottledError
Constant Summary collapse
- ADD_PROCESS_SCRIPT =
" -- Parse arguments\n local sorted_set = ARGV[1]\n local process_id = ARGV[2]\n local limit = tonumber(ARGV[3])\n local ttl = tonumber(ARGV[4])\n local now = tonumber(ARGV[5])\n\n -- Get count of current processes. If more than the max, check if any of the processes have timed out\n -- and try again.\n local process_count = redis.call('zcard', sorted_set)\n if process_count >= limit then\n local max_score = now - ttl\n local expired_keys = redis.call('zremrangebyscore', sorted_set, '-inf', max_score)\n if expired_keys > 0 then\n process_count = redis.call('zcard', sorted_set)\n end\n end\n\n -- Success so add to the list and set a global expiration so the list cleans up after itself.\n if process_count < limit then\n redis.call('zadd', sorted_set, now, process_id)\n redis.call('expire', sorted_set, ttl)\n end\n\n -- Return the number of processes running before the process was added.\n return process_count\n"
Instance Attribute Summary collapse
-
#limit ⇒ Object
readonly
Returns the value of attribute limit.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Class Method Summary collapse
-
.redis(&block) ⇒ Redis
Either configure the redis instance using a block or yield the instance.
-
.redis=(conn) ⇒ void
Set the redis instance to a specific instance.
Instance Method Summary collapse
-
#clear! ⇒ void
Clear all locks.
-
#current ⇒ Integer
Get the number of processes currently being executed for this restrainer.
-
#initialize(name, limit:, timeout: 60, redis: nil) ⇒ Restrainer
constructor
Create a new restrainer.
-
#lock!(process_id = nil, limit: nil) ⇒ String
Obtain a lock on one the allowed processes.
-
#release!(process_id) ⇒ Boolean
Release one of the allowed processes.
-
#throttle(limit: nil) ⇒ void
Wrap a block with this method to throttle concurrent execution.
Constructor Details
#initialize(name, limit:, timeout: 60, redis: nil) ⇒ Restrainer
Create a new restrainer. The name is used to identify the Restrainer and group processes together. You can create any number of Restrainers with different names.
The required limit parameter specifies the maximum number of processes that will be allowed to execute the throttle block at any point in time.
The timeout parameter is used for cleaning up internal data structures so that jobs aren’t orphaned if their process is killed. Processes will automatically be removed from the running jobs list after the specified number of seconds. Note that the Restrainer will not handle timing out any code itself. This value is just used to insure the integrity of internal data structures.
102 103 104 105 106 107 108 |
# File 'lib/restrainer.rb', line 102 def initialize(name, limit:, timeout: 60, redis: nil) @name = name @limit = limit @timeout = timeout @key = "#{self.class.name}.#{name}" @redis = redis end |
Instance Attribute Details
#limit ⇒ Object (readonly)
Returns the value of attribute limit.
17 18 19 |
# File 'lib/restrainer.rb', line 17 def limit @limit end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
17 18 19 |
# File 'lib/restrainer.rb', line 17 def name @name end |
Class Method Details
.redis(&block) ⇒ Redis
Either configure the redis instance using a block or yield the instance. Configuring with a block allows you to use things like connection pools etc. without hard coding a single instance.
66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/restrainer.rb', line 66 def redis(&block) if block @redis = block else unless @redis client = Redis.new @redis = lambda { client } end @redis.call end end |
.redis=(conn) ⇒ void
This method returns an undefined value.
Set the redis instance to a specific instance. It is usually preferable to use the block form for configurating the instance so that it can be evaluated at runtime.
86 87 88 |
# File 'lib/restrainer.rb', line 86 def redis=(conn) @redis = lambda { conn } end |
Instance Method Details
#clear! ⇒ void
This method returns an undefined value.
Clear all locks.
175 176 177 |
# File 'lib/restrainer.rb', line 175 def clear! redis.del(key) end |
#current ⇒ Integer
Get the number of processes currently being executed for this restrainer.
168 169 170 |
# File 'lib/restrainer.rb', line 168 def current redis.zcard(key).to_i end |
#lock!(process_id = nil, limit: nil) ⇒ String
Obtain a lock on one the allowed processes. The method returns a process identifier that must be passed to the release! to release the lock. You can pass in a unique identifier if you already have one.
142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/restrainer.rb', line 142 def lock!(process_id = nil, limit: nil) process_id ||= SecureRandom.uuid limit ||= self.limit # limit of less zero is no limit; limit of zero is allow none return nil if limit < 0 raise ThrottledError.new("#{self.class}: #{@name} is not allowing any processing") if limit == 0 add_process!(redis, process_id, limit) process_id end |
#release!(process_id) ⇒ Boolean
Release one of the allowed processes. You must pass in a process id returned by the lock method.
159 160 161 162 163 |
# File 'lib/restrainer.rb', line 159 def release!(process_id) return false if process_id.nil? remove_process!(redis, process_id) end |
#throttle(limit: nil) ⇒ void
This method returns an undefined value.
Wrap a block with this method to throttle concurrent execution. If more than the alotted number of processes (as identified by the name) are currently executing, then a Restrainer::ThrottledError will be raised.
118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/restrainer.rb', line 118 def throttle(limit: nil) limit ||= self.limit # limit of less zero is no limit; limit of zero is allow none return yield if limit < 0 process_id = lock!(limit: limit) begin yield ensure release!(process_id) end end |