Class: FiberJob::Queue
- Inherits:
-
Object
- Object
- FiberJob::Queue
- Defined in:
- lib/fiber_job/queue.rb
Overview
Queue provides Redis-based queue operations for job storage and retrieval. Handles immediate execution queues, scheduled jobs, priority handling, and failure tracking using Redis data structures.
The queue system uses Redis lists for immediate jobs, sorted sets for scheduled jobs, and separate lists for failed job tracking.
Class Attribute Summary collapse
Class Method Summary collapse
-
.cancel_job(jid) ⇒ Boolean
Cancels a job by removing it from queues.
-
.failed_jobs ⇒ Array<Hash>
Retrieves all failed jobs for inspection and debugging.
-
.find_job(jid) ⇒ Hash?
Finds a job by its JID across all queues and scheduled jobs.
-
.job_status(jid) ⇒ String
Gets the status of a job by its JID.
-
.list_jobs(limit: 100) ⇒ Hash
Lists all jobs with their JIDs and statuses.
-
.pop(queue_name, timeout: 0.1, redis_conn: nil) ⇒ Hash?
Removes and returns a job from the specified queue.
-
.push(queue_name, payload) ⇒ Integer
Adds a job to the specified queue for immediate processing.
-
.push_priority(queue_name, payload) ⇒ Integer
Adds a job to the head of the queue for priority processing.
-
.redis ⇒ RedisClient
deprecated
Deprecated.
Use redis_pool.with { |redis| … } instead for better performance
-
.redis_connection ⇒ RedisClient
deprecated
Deprecated.
Use redis_pool.with { |redis| … } instead for better performance
-
.redis_pool ⇒ ConnectionPool
Returns a Redis connection from the pool.
-
.schedule(queue_name, payload, scheduled_at) ⇒ Boolean
Schedules a job for execution at a specific time.
-
.schedule_priority(queue_name, payload, scheduled_at) ⇒ Boolean
Schedules a job with priority for execution at a specific time.
-
.scheduled_jobs(queue_name) ⇒ Integer
Processes scheduled jobs that are ready for execution.
-
.stats(queue_name) ⇒ Hash
Returns statistics for the specified queue.
-
.store_failed_job(job_data, error) ⇒ Integer
Stores a failed job with error information for later analysis.
Class Attribute Details
Class Method Details
.cancel_job(jid) ⇒ Boolean
Cancels a job by removing it from queues. Can cancel jobs that are queued or scheduled, but not currently processing.
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 |
# File 'lib/fiber_job/queue.rb', line 308 def self.cancel_job(jid) return false unless jid redis_pool.with do |redis| # Search and remove from all known queues config.queues.each do |queue_name| # Check active queue queue_jobs = redis.call("LRANGE", "queue:#{queue_name}", 0, -1) queue_jobs.each_with_index do |job_json, index| job_data = JSON.parse(job_json) if job_data['jid'] == jid # Remove from queue by index (note: LREM removes by value) redis.call("LREM", "queue:#{queue_name}", 1, job_json) return true end end # Check scheduled jobs scheduled_jobs = redis.call("ZRANGE", "schedule:#{queue_name}", 0, -1) scheduled_jobs.each do |job_json| job_data = JSON.parse(job_json) if job_data['jid'] == jid redis.call("ZREM", "schedule:#{queue_name}", job_json) return true end end end end false end |
.failed_jobs ⇒ Array<Hash>
Retrieves all failed jobs for inspection and debugging.
244 245 246 247 248 |
# File 'lib/fiber_job/queue.rb', line 244 def self.failed_jobs redis_pool.with do |redis| redis.call("LRANGE", 'failed', 0, -1).map { |job_json| JSON.parse(job_json) } end end |
.find_job(jid) ⇒ Hash?
Finds a job by its JID across all queues and scheduled jobs. Searches through active queues, scheduled jobs, and failed jobs.
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 |
# File 'lib/fiber_job/queue.rb', line 261 def self.find_job(jid) return nil unless jid redis_pool.with do |redis| # Search in all known queues config.queues.each do |queue_name| # Check active queue queue_jobs = redis.call("LRANGE", "queue:#{queue_name}", 0, -1) queue_jobs.each do |job_json| job_data = JSON.parse(job_json) if job_data['jid'] == jid return job_data.merge('status' => 'queued', 'queue' => queue_name) end end # Check scheduled jobs scheduled_jobs = redis.call("ZRANGE", "schedule:#{queue_name}", 0, -1) scheduled_jobs.each do |job_json| job_data = JSON.parse(job_json) if job_data['jid'] == jid return job_data.merge('status' => 'scheduled', 'queue' => queue_name) end end end # Check failed jobs failed_jobs = redis.call("LRANGE", 'failed', 0, -1) failed_jobs.each do |job_json| job_data = JSON.parse(job_json) if job_data['jid'] == jid return job_data.merge('status' => 'failed') end end end nil end |
.job_status(jid) ⇒ String
Gets the status of a job by its JID. Possible statuses: ‘queued’, ‘scheduled’, ‘failed’, ‘not_found’
358 359 360 361 |
# File 'lib/fiber_job/queue.rb', line 358 def self.job_status(jid) job = find_job(jid) job ? job['status'] : 'not_found' end |
.list_jobs(limit: 100) ⇒ Hash
Lists all jobs with their JIDs and statuses. Useful for debugging and monitoring.
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 |
# File 'lib/fiber_job/queue.rb', line 372 def self.list_jobs(limit: 100) result = { 'queued' => [], 'scheduled' => [], 'failed' => [] } redis_pool.with do |redis| # Get queued jobs config.queues.each do |queue_name| queue_jobs = redis.call("LRANGE", "queue:#{queue_name}", 0, limit - 1) queue_jobs.each do |job_json| job_data = JSON.parse(job_json) result['queued'] << job_data.merge('queue' => queue_name) end # Get scheduled jobs scheduled_jobs = redis.call("ZRANGE", "schedule:#{queue_name}", 0, limit - 1) scheduled_jobs.each do |job_json| job_data = JSON.parse(job_json) result['scheduled'] << job_data.merge('queue' => queue_name) end end # Get failed jobs failed_jobs = redis.call("LRANGE", 'failed', 0, limit - 1) failed_jobs.each do |job_json| job_data = JSON.parse(job_json) result['failed'] << job_data end end result end |
.pop(queue_name, timeout: 0.1, redis_conn: nil) ⇒ Hash?
Removes and returns a job from the specified queue. Blocks for the specified timeout waiting for jobs to become available. Uses connection pool by default for better performance.
112 113 114 115 116 117 118 119 120 121 |
# File 'lib/fiber_job/queue.rb', line 112 def self.pop(queue_name, timeout: 0.1, redis_conn: nil) data = if redis_conn # Use provided connection (for legacy compatibility) redis_conn.call("BRPOP", "queue:#{queue_name}", timeout) else # Use connection pool for better performance redis_pool.with { |redis| redis.call("BRPOP", "queue:#{queue_name}", timeout) } end data ? JSON.parse(data[1]) : nil end |
.push(queue_name, payload) ⇒ Integer
Adds a job to the specified queue for immediate processing. Jobs are added to the left side of the list (LPUSH) and processed from the right side (BRPOP) implementing FIFO behavior.
80 81 82 |
# File 'lib/fiber_job/queue.rb', line 80 def self.push(queue_name, payload) redis_pool.with { |redis| redis.call("LPUSH", "queue:#{queue_name}", JSON.dump(payload)) } end |
.push_priority(queue_name, payload) ⇒ Integer
Adds a job to the head of the queue for priority processing. Priority jobs are processed before regular jobs in the same queue.
93 94 95 96 |
# File 'lib/fiber_job/queue.rb', line 93 def self.push_priority(queue_name, payload) # Add to the head of the queue for priority execution redis_pool.with { |redis| redis.call("RPUSH", "queue:#{queue_name}", JSON.dump(payload)) } end |
.redis ⇒ RedisClient
Use redis_pool.with { |redis| … } instead for better performance
Returns the shared Redis connection instance. Creates a new connection if one doesn’t exist.
55 56 57 |
# File 'lib/fiber_job/queue.rb', line 55 def self.redis @redis ||= RedisClient.new(url: config.redis_url) end |
.redis_connection ⇒ RedisClient
Use redis_pool.with { |redis| … } instead for better performance
Creates a new Redis connection for fiber-safe operations. Used when concurrent operations need separate connections.
64 65 66 67 |
# File 'lib/fiber_job/queue.rb', line 64 def self.redis_connection # Create a new Redis connection for fiber-safe operations RedisClient.new(url: config.redis_url) end |
.redis_pool ⇒ ConnectionPool
Returns a Redis connection from the pool. This is the preferred method for getting Redis connections as it provides connection pooling and thread/fiber safety.
44 45 46 47 48 |
# File 'lib/fiber_job/queue.rb', line 44 def self.redis_pool @redis_pool ||= ConnectionPool.new(size: config.pool_size, timeout: 5) do RedisClient.new(url: config.redis_url) end end |
.schedule(queue_name, payload, scheduled_at) ⇒ Boolean
Schedules a job for execution at a specific time. Uses Redis sorted sets with timestamp as score for efficient time-based retrieval.
135 136 137 |
# File 'lib/fiber_job/queue.rb', line 135 def self.schedule(queue_name, payload, scheduled_at) redis_pool.with { |redis| redis.call("ZADD", "schedule:#{queue_name}", scheduled_at, JSON.dump(payload)) } end |
.schedule_priority(queue_name, payload, scheduled_at) ⇒ Boolean
Schedules a job with priority for execution at a specific time. Priority scheduled jobs are moved to the head of the queue when ready.
146 147 148 149 150 |
# File 'lib/fiber_job/queue.rb', line 146 def self.schedule_priority(queue_name, payload, scheduled_at) # Mark as priority retry for head-of-queue execution priority_payload = payload.merge('priority_retry' => true) redis_pool.with { |redis| redis.call("ZADD", "schedule:#{queue_name}", scheduled_at, JSON.dump(priority_payload)) } end |
.scheduled_jobs(queue_name) ⇒ Integer
Processes scheduled jobs that are ready for execution. Moves jobs from the scheduled set to the appropriate queue when their scheduled time has arrived. Uses Redis pipeline for better performance when processing multiple jobs.
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/fiber_job/queue.rb', line 162 def self.scheduled_jobs(queue_name) redis_pool.with do |redis| now = Time.now.to_f jobs = redis.call("ZRANGEBYSCORE", "schedule:#{queue_name}", 0, now) return 0 if jobs.empty? # Use pipeline for better performance with multiple operations redis.pipelined do |pipeline| jobs.each do |job_json| pipeline.call("ZREM", "schedule:#{queue_name}", job_json) job_data = JSON.parse(job_json) # Use priority queue for retries if job_data['priority_retry'] job_data.delete('priority_retry') # Clean up the flag pipeline.call("RPUSH", "queue:#{queue_name}", JSON.dump(job_data)) else pipeline.call("LPUSH", "queue:#{queue_name}", JSON.dump(job_data)) end end end jobs.size end end |
.stats(queue_name) ⇒ Hash
Returns statistics for the specified queue. Provides insight into queue depth, scheduled jobs, and processing status. Uses pipeline for efficient batch operations.
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/fiber_job/queue.rb', line 199 def self.stats(queue_name) redis_pool.with do |redis| results = redis.pipelined do |pipeline| pipeline.call("LLEN", "queue:#{queue_name}") pipeline.call("ZCARD", "schedule:#{queue_name}") pipeline.call("GET", "processing:#{queue_name}") end { size: results[0], scheduled: results[1], processing: results[2].to_i } end end |
.store_failed_job(job_data, error) ⇒ Integer
Stores a failed job with error information for later analysis. Failed jobs are stored with original job data plus failure metadata.
228 229 230 231 232 233 234 235 |
# File 'lib/fiber_job/queue.rb', line 228 def self.store_failed_job(job_data, error) failed_job_data = job_data.merge({ 'failed_at' => Time.now.to_f, 'error' => error., 'backtrace' => error.backtrace&.first(10) }) redis_pool.with { |redis| redis.call("LPUSH", 'failed', JSON.dump(failed_job_data)) } end |