Class: Upperkut::Strategies::PriorityQueue
- Includes:
- Util
- Defined in:
- lib/upperkut/strategies/priority_queue.rb
Overview
Public: Queue that prevent a single tenant from taking over.
Constant Summary collapse
- ONE_DAY_IN_SECONDS =
86400- ENQUEUE_ITEM =
Logic as follows:
We keep the last score used for each tenant key. One tenant_key is
an tenant unique id. To calculate the next_score we use max(current_tenant_score, current_global_score) + increment we store the queue in a sorted set using the next_score as ordering key if one tenant sends lots of , this tenant ends up with lots of in the queue spaced by increment if another tenant then sends a , since it previous_tenant_score is lower than the first tenant, it will be inserted before it in the queue.In other words, the idea of this queue is to not allowing an tenant
that sends a lot of to dominate processing and give a chance for tenants that sends few to have a fair share of processing time. %( local increment = 1 local current_checkpoint = tonumber(redis.call("GET", KEYS[1])) or 0 local score_key = KEYS[2] local current_score = tonumber(redis.call("GET", score_key)) or 0 local queue_key = KEYS[3] local next_score = nil if current_score >= current_checkpoint then next_score = current_score + increment else next_score = current_checkpoint + increment end redis.call("SETEX", score_key, #{ONE_DAY_IN_SECONDS}, next_score) redis.call("ZADD", queue_key, next_score, ARGV[1]) return next_score ).freeze
- DEQUEUE_ITEM =
Uses ZPOP* functions available only on redis 5.0.0+
%( local checkpoint_key = KEYS[1] local queue_key = KEYS[2] local batch_size = ARGV[1] local popped_items = redis.call("ZPOPMIN", queue_key, batch_size) local items = {} local last_score = 0 for i, v in ipairs(popped_items) do if i % 2 == 1 then table.insert(items, v) else last_score = v end end redis.call("SETEX", checkpoint_key, 86400, last_score) return items ).freeze
Instance Method Summary collapse
-
#clear ⇒ Object
Public: Clear all data related to the strategy.
-
#fetch_items ⇒ Object
Public: Retrieve events from Strategy.
-
#initialize(worker, options) ⇒ PriorityQueue
constructor
A new instance of PriorityQueue.
-
#metrics ⇒ Object
Public: Consolidated strategy metrics.
-
#process? ⇒ Boolean
Public: Tells when to execute the event processing, when this condition is met so the events are dispatched to the worker.
-
#push_items(items = []) ⇒ Object
Public: Ingests the event into strategy.
Methods included from Util
#decode_json_items, #encode_json_items, #to_underscore
Constructor Details
#initialize(worker, options) ⇒ PriorityQueue
Returns a new instance of PriorityQueue.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/upperkut/strategies/priority_queue.rb', line 65 def initialize(worker, ) @worker = worker = @priority_key = .fetch(:priority_key) = .fetch(:redis, {}) @max_wait = .fetch( :max_wait, Integer(ENV['UPPERKUT_MAX_WAIT'] || 20) ) @batch_size = .fetch( :batch_size, Integer(ENV['UPPERKUT_BATCH_SIZE'] || 1000) ) @waiting_time = 0 raise ArgumentError, 'Invalid priority_key. ' \ 'Must be a lambda' unless @priority_key.respond_to?(:call) end |
Instance Method Details
#clear ⇒ Object
Public: Clear all data related to the strategy.
130 131 132 |
# File 'lib/upperkut/strategies/priority_queue.rb', line 130 def clear redis { |conn| conn.del(queue_key) } end |
#fetch_items ⇒ Object
Public: Retrieve events from Strategy.
Returns an Array containing events as hash.
117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/upperkut/strategies/priority_queue.rb', line 117 def fetch_items batch_size = [@batch_size, size].min items = redis do |conn| conn.eval(DEQUEUE_ITEM, keys: [queue_checkpoint_key, queue_key], argv: [batch_size]) end decode_json_items(items) end |
#metrics ⇒ Object
Public: Consolidated strategy metrics.
Returns hash containing metric name and values.
150 151 152 153 154 |
# File 'lib/upperkut/strategies/priority_queue.rb', line 150 def metrics { 'size' => size } end |
#process? ⇒ Boolean
Public: Tells when to execute the event processing, when this condition is met so the events are dispatched to the worker.
137 138 139 140 141 142 143 144 145 |
# File 'lib/upperkut/strategies/priority_queue.rb', line 137 def process? if fulfill_condition?(size) @waiting_time = 0 return true end @waiting_time += @worker.setup.polling_interval false end |
#push_items(items = []) ⇒ Object
Public: Ingests the event into strategy.
items - The Array of items do be inserted.
Returns true when success, raise when error.
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/upperkut/strategies/priority_queue.rb', line 92 def push_items(items = []) items = [items] if items.is_a?(Hash) return false if items.empty? redis do |conn| items.each do |item| priority_key = @priority_key.call(item) score_key = "#{queue_key}:#{priority_key}:score" keys = [queue_checkpoint_key, score_key, queue_key] conn.eval(ENQUEUE_ITEM, keys: keys, argv: [encode_json_items([item])]) end end true end |