Class: Upperkut::Strategies::PriorityQueue
- Includes:
- Util
- Defined in:
- lib/upperkut/strategies/priority_queue.rb
Overview
Public: Queue that prevent a single tenant from taking over.
Constant Summary collapse
- ONE_DAY_IN_SECONDS =
86400- ENQUEUE_ITEM =
Logic as follows:
We keep the last score used for each tenant key. One tenant_key is
an tenant unique id. To calculate the next_score we use max(current_tenant_score, current_global_score) + increment we store the queue in a sorted set using the next_score as ordering key if one tenant sends lots of messages, this tenant ends up with lots of messages in the queue spaced by increment if another tenant then sends a message, since it previous_tenant_score is lower than the first tenant, it will be inserted before it in the queue.In other words, the idea of this queue is to not allowing an tenant
that sends a lot of messages to dominate processing and give a chance for tenants that sends few messages to have a fair share of processing time. %( local increment = 1 local current_checkpoint = tonumber(redis.call("GET", KEYS[1])) or 0 local score_key = KEYS[2] local current_score = tonumber(redis.call("GET", score_key)) or 0 local queue_key = KEYS[3] local next_score = nil if current_score >= current_checkpoint then next_score = current_score + increment else next_score = current_checkpoint + increment end redis.call("SETEX", score_key, #{ONE_DAY_IN_SECONDS}, next_score) redis.call("ZADD", queue_key, next_score, ARGV[1]) return next_score ).freeze
- DEQUEUE_ITEM =
Uses ZPOP* functions available only on redis 5.0.0+
%( local checkpoint_key = KEYS[1] local queue_key = KEYS[2] local batch_size = ARGV[1] local popped_items = redis.call("ZPOPMIN", queue_key, batch_size) local items = {} local last_score = 0 for i, v in ipairs(popped_items) do if i % 2 == 1 then table.insert(items, v) else last_score = v end end redis.call("SETEX", checkpoint_key, 86400, last_score) return items ).freeze
Instance Method Summary collapse
-
#clear ⇒ Object
Public: Clear all data related to the strategy.
-
#fetch_items ⇒ Object
Public: Retrieve events from Strategy.
-
#initialize(worker, options) ⇒ PriorityQueue
constructor
A new instance of PriorityQueue.
-
#metrics ⇒ Object
Public: Consolidated strategy metrics.
-
#process? ⇒ Boolean
Public: Tells when to execute the event processing, when this condition is met so the events are dispatched to the worker.
-
#push_items(items = []) ⇒ Object
Public: Ingests the event into strategy.
Methods included from Util
#decode_json_items, #normalize_items, #retry_block, #to_underscore
Constructor Details
#initialize(worker, options) ⇒ PriorityQueue
Returns a new instance of PriorityQueue.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/upperkut/strategies/priority_queue.rb', line 69 def initialize(worker, ) @worker = worker @options = @priority_key = .fetch(:priority_key) @redis_options = .fetch(:redis, {}) @max_wait = .fetch( :max_wait, Integer(ENV['UPPERKUT_MAX_WAIT'] || 20) ) @batch_size = .fetch( :batch_size, Integer(ENV['UPPERKUT_BATCH_SIZE'] || 1000) ) @waiting_time = 0 raise ArgumentError, 'Invalid priority_key. ' \ 'Must be a lambda' unless @priority_key.respond_to?(:call) end |
Instance Method Details
#clear ⇒ Object
Public: Clear all data related to the strategy.
134 135 136 |
# File 'lib/upperkut/strategies/priority_queue.rb', line 134 def clear redis { |conn| conn.del(queue_key) } end |
#fetch_items ⇒ Object
Public: Retrieve events from Strategy.
Returns an Array containing events as hash.
121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/upperkut/strategies/priority_queue.rb', line 121 def fetch_items batch_size = [@batch_size, size].min items = redis do |conn| conn.eval(DEQUEUE_ITEM, keys: [queue_checkpoint_key, queue_key], argv: [batch_size]) end decode_json_items(items) end |
#metrics ⇒ Object
Public: Consolidated strategy metrics.
Returns hash containing metric name and values.
154 155 156 157 158 |
# File 'lib/upperkut/strategies/priority_queue.rb', line 154 def metrics { 'size' => size } end |
#process? ⇒ Boolean
Public: Tells when to execute the event processing, when this condition is met so the events are dispatched to the worker.
141 142 143 144 145 146 147 148 149 |
# File 'lib/upperkut/strategies/priority_queue.rb', line 141 def process? if fulfill_condition?(size) @waiting_time = 0 return true end @waiting_time += @worker.setup.polling_interval false end |
#push_items(items = []) ⇒ Object
Public: Ingests the event into strategy.
items - The Array of items do be inserted.
Returns true when success, raise when error.
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/upperkut/strategies/priority_queue.rb', line 96 def push_items(items = []) items = normalize_items(items) return false if items.empty? redis do |conn| items.each do |item| priority_key = @priority_key.call(item) score_key = "#{queue_key}:#{priority_key}:score" keys = [queue_checkpoint_key, score_key, queue_key] conn.eval(ENQUEUE_ITEM, keys: keys, argv: [item.to_json]) end end true end |