Class: Karafka::Pro::Processing::JobsQueue
- Inherits:
-
Karafka::Processing::JobsQueue
- Object
- Karafka::Processing::JobsQueue
- Karafka::Pro::Processing::JobsQueue
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/pro/processing/jobs_queue.rb
Overview
Enhanced processing queue that provides ability to build complex work-distribution schedulers dedicated to particular job types
Aside from the OSS queue capabilities it allows for jobless locking for advanced schedulers
Instance Attribute Summary collapse
-
#in_processing ⇒ Object
Returns the value of attribute in_processing.
Instance Method Summary collapse
-
#clear(group_id) ⇒ Object
Clears the processing states for a provided group.
-
#empty?(group_id) ⇒ Boolean
a given group.
- #initialize ⇒ Karafka::Pro::Processing::JobsQueue constructor
-
#lock(job) ⇒ Object
Method that allows us to lock queue on a given subscription group without enqueuing the a job.
-
#lock_async(group_id, lock_id, timeout: WAIT_TIMEOUT) ⇒ Object
Allows for explicit locking of the queue of a given subscription group.
-
#register(group_id) ⇒ Object
Registers semaphore and a lock hash.
-
#unlock(job) ⇒ Object
Method for unlocking the given subscription group queue space that was locked with a given job that was not added to the queue but used via
#lock. -
#unlock_async(group_id, lock_id) ⇒ Object
Allows for explicit unlocking of locked queue of a group.
-
#wait(group_id) ⇒ Object
Blocks when there are things in the queue in a given group and waits until all the blocking jobs from a given group are completed or any of the locks times out.
Methods inherited from Karafka::Processing::JobsQueue
#<<, #close, #complete, #pop, #statistics, #tick
Constructor Details
#initialize ⇒ Karafka::Pro::Processing::JobsQueue
41 42 43 44 45 46 47 48 49 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 41 def initialize super @in_waiting = Hash.new { |h, k| h[k] = [] } @locks = Hash.new { |h, k| h[k] = {} } @async_locking = false @statistics[:waiting] = 0 end |
Instance Attribute Details
#in_processing ⇒ Object
Returns the value of attribute in_processing.
33 34 35 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 33 def in_processing @in_processing end |
Instance Method Details
#clear(group_id) ⇒ Object
Clears the processing states for a provided group. Useful when a recovery happens and we need to clean up state but only for a given subscription group.
139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 139 def clear(group_id) @mutex.synchronize do @in_processing[group_id].clear @statistics[:waiting] -= @in_waiting[group_id].size @in_waiting[group_id].clear @locks[group_id].clear @async_locking = false # We unlock it just in case it was blocked when clearing started tick(group_id) end end |
#empty?(group_id) ⇒ Boolean
a given group.
157 158 159 160 161 162 163 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 157 def empty?(group_id) @mutex.synchronize do @in_processing[group_id].empty? && @in_waiting[group_id].empty? && !locked_async?(group_id) end end |
#lock(job) ⇒ Object
Method that allows us to lock queue on a given subscription group without enqueuing the a job. This can be used when building complex schedulers that want to postpone enqueuing before certain conditions are met.
66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 66 def lock(job) @mutex.synchronize do group = @in_waiting[job.group_id] # This should never happen. Same job should not be locked twice raise(Errors::JobsQueueSynchronizationError, job.group_id) if group.include?(job) @statistics[:waiting] += 1 group << job end end |
#lock_async(group_id, lock_id, timeout: WAIT_TIMEOUT) ⇒ Object
We do not raise Errors::JobsQueueSynchronizationError similar to #lock here because we want to have ability to prolong time limited locks
Allows for explicit locking of the queue of a given subscription group.
This can be used for cross-topic synchronization.
107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 107 def lock_async(group_id, lock_id, timeout: WAIT_TIMEOUT) return if @queue.closed? @async_locking = true @mutex.synchronize do @locks[group_id][lock_id] = monotonic_now + timeout # We need to tick so our new time sensitive lock can reload time constraints on sleep tick(group_id) end end |
#register(group_id) ⇒ Object
Registers semaphore and a lock hash
54 55 56 57 58 59 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 54 def register(group_id) super @mutex.synchronize do @locks[group_id] end end |
#unlock(job) ⇒ Object
Method for unlocking the given subscription group queue space that was locked with a given job that was not added to the queue but used via #lock.
83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 83 def unlock(job) @mutex.synchronize do @statistics[:waiting] -= 1 return if @in_waiting[job.group_id].delete(job) # This should never happen. It means there was a job being unlocked that was never # locked in the first place raise(Errors::JobsQueueSynchronizationError, job.group_id) end end |
#unlock_async(group_id, lock_id) ⇒ Object
Allows for explicit unlocking of locked queue of a group
123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 123 def unlock_async(group_id, lock_id) @mutex.synchronize do if @locks[group_id].delete(lock_id) tick(group_id) return end raise(Errors::JobsQueueSynchronizationError, [group_id, lock_id]) end end |
#wait(group_id) ⇒ Object
Because checking that async locking is on happens on regular ticking, first lock on a group can take up to one tick. That is expected.
This implementation takes into consideration temporary async locks that can happen. Thanks to the fact that we use the minimum lock time as a timeout, we do not have to wait a whole ticking period to unlock async locks.
Blocks when there are things in the queue in a given group and waits until all the
blocking jobs from a given group are completed or any of the locks times out
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 176 def wait(group_id) return super unless @async_locking # We do not generalize this flow because this one is more expensive as it has to allocate # extra objects. That's why we only use it when locks are actually in use base_interval = tick_interval / 1_000.0 while wait?(group_id) yield if block_given? now = monotonic_now wait_times = @locks[group_id].values.map! do |lock_time| # Convert ms to seconds, seconds are required by Ruby queue engine (lock_time - now) / 1_000 end wait_times.delete_if(&:negative?) wait_times << base_interval @semaphores.fetch(group_id).pop(timeout: wait_times.min) end end |