Class: FiberJob::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/fiber_job/queue.rb

Overview

Queue provides Redis-based queue operations for job storage and retrieval. Handles immediate execution queues, scheduled jobs, priority handling, and failure tracking using Redis data structures.

The queue system uses Redis lists for immediate jobs, sorted sets for scheduled jobs, and separate lists for failed job tracking.

Examples:

Basic queue operations

# Push a job to queue
FiberJob::Queue.push(:default, job_data)

# Pop a job from queue
job = FiberJob::Queue.pop(:default, timeout: 1.0)

# Schedule a job for later
FiberJob::Queue.schedule(:default, job_data, Time.now.to_f + 3600)

Queue statistics

stats = FiberJob::Queue.stats(:default)
puts "Queue size: #{stats[:size]}"
puts "Scheduled jobs: #{stats[:scheduled]}"

See Also:

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.configObject



34
35
36
# File 'lib/fiber_job/queue.rb', line 34

def config
  @config ||= FiberJob.config
end

Class Method Details

.cancel_job(jid) ⇒ Boolean

Cancels a job by removing it from queues. Can cancel jobs that are queued or scheduled, but not currently processing.

Examples:

Cancel a job

success = FiberJob::Queue.cancel_job("01J4X8K2V3N9QRSTUVWXYZ1234")
puts success ? "Job cancelled" : "Job not found or already processing"


308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/fiber_job/queue.rb', line 308

def self.cancel_job(jid)
  return false unless jid

  redis_pool.with do |redis|
    # Search and remove from all known queues
    config.queues.each do |queue_name|
      # Check active queue
      queue_jobs = redis.call("LRANGE", "queue:#{queue_name}", 0, -1)
      queue_jobs.each_with_index do |job_json, index|
        job_data = JSON.parse(job_json)
        if job_data['jid'] == jid
          # Remove from queue by index (note: LREM removes by value)
          redis.call("LREM", "queue:#{queue_name}", 1, job_json)
          return true
        end
      end

      # Check scheduled jobs
      scheduled_jobs = redis.call("ZRANGE", "schedule:#{queue_name}", 0, -1)
      scheduled_jobs.each do |job_json|
        job_data = JSON.parse(job_json)
        if job_data['jid'] == jid
          redis.call("ZREM", "schedule:#{queue_name}", job_json)
          return true
        end
      end
    end
  end

  false
end

.failed_jobsArray<Hash>

Retrieves all failed jobs for inspection and debugging.

Examples:

Get failed jobs

failed = FiberJob::Queue.failed_jobs
failed.each { |job| puts "Failed: #{job['class']} - #{job['error']}" }


244
245
246
247
248
# File 'lib/fiber_job/queue.rb', line 244

def self.failed_jobs
  redis_pool.with do |redis|
    redis.call("LRANGE", 'failed', 0, -1).map { |job_json| JSON.parse(job_json) }
  end
end

.find_job(jid) ⇒ Hash?

Finds a job by its JID across all queues and scheduled jobs. Searches through active queues, scheduled jobs, and failed jobs.

Examples:

Find job by JID

job = FiberJob::Queue.find_job("01J4X8K2V3N9QRSTUVWXYZ1234")
if job
  puts "Found job: #{job['class']} in #{job['status']}"
end


261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/fiber_job/queue.rb', line 261

def self.find_job(jid)
  return nil unless jid

  redis_pool.with do |redis|
    # Search in all known queues
    config.queues.each do |queue_name|
      # Check active queue
      queue_jobs = redis.call("LRANGE", "queue:#{queue_name}", 0, -1)
      queue_jobs.each do |job_json|
        job_data = JSON.parse(job_json)
        if job_data['jid'] == jid
          return job_data.merge('status' => 'queued', 'queue' => queue_name)
        end
      end

      # Check scheduled jobs
      scheduled_jobs = redis.call("ZRANGE", "schedule:#{queue_name}", 0, -1)
      scheduled_jobs.each do |job_json|
        job_data = JSON.parse(job_json)
        if job_data['jid'] == jid
          return job_data.merge('status' => 'scheduled', 'queue' => queue_name)
        end
      end
    end

    # Check failed jobs
    failed_jobs = redis.call("LRANGE", 'failed', 0, -1)
    failed_jobs.each do |job_json|
      job_data = JSON.parse(job_json)
      if job_data['jid'] == jid
        return job_data.merge('status' => 'failed')
      end
    end
  end

  nil
end

.job_status(jid) ⇒ String

Gets the status of a job by its JID. Possible statuses: ‘queued’, ‘scheduled’, ‘failed’, ‘not_found’

Examples:

Check job status

status = FiberJob::Queue.job_status("01J4X8K2V3N9QRSTUVWXYZ1234")
case status
when 'queued'
  puts "Job is waiting to be processed"
when 'scheduled'
  puts "Job is scheduled for future execution"
when 'failed'
  puts "Job failed permanently"
when 'not_found'
  puts "Job not found (may have completed successfully)"
end


358
359
360
361
# File 'lib/fiber_job/queue.rb', line 358

def self.job_status(jid)
  job = find_job(jid)
  job ? job['status'] : 'not_found'
end

.list_jobs(limit: 100) ⇒ Hash

Lists all jobs with their JIDs and statuses. Useful for debugging and monitoring.

Examples:

List all jobs

jobs = FiberJob::Queue.list_jobs(limit: 50)
jobs['queued'].each { |job| puts "Queued: #{job['class']} [#{job['jid']}]" }


372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
# File 'lib/fiber_job/queue.rb', line 372

def self.list_jobs(limit: 100)
  result = {
    'queued' => [],
    'scheduled' => [],
    'failed' => []
  }

  redis_pool.with do |redis|
    # Get queued jobs
    config.queues.each do |queue_name|
      queue_jobs = redis.call("LRANGE", "queue:#{queue_name}", 0, limit - 1)
      queue_jobs.each do |job_json|
        job_data = JSON.parse(job_json)
        result['queued'] << job_data.merge('queue' => queue_name)
      end

      # Get scheduled jobs
      scheduled_jobs = redis.call("ZRANGE", "schedule:#{queue_name}", 0, limit - 1)
      scheduled_jobs.each do |job_json|
        job_data = JSON.parse(job_json)
        result['scheduled'] << job_data.merge('queue' => queue_name)
      end
    end

    # Get failed jobs
    failed_jobs = redis.call("LRANGE", 'failed', 0, limit - 1)
    failed_jobs.each do |job_json|
      job_data = JSON.parse(job_json)
      result['failed'] << job_data
    end
  end

  result
end

.pop(queue_name, timeout: 0.1, redis_conn: nil) ⇒ Hash?

Removes and returns a job from the specified queue. Blocks for the specified timeout waiting for jobs to become available. Uses connection pool by default for better performance.

Examples:

Pop from queue with timeout

job = FiberJob::Queue.pop(:default, timeout: 5.0)
if job
  puts "Processing job: #{job['class']}"
end


112
113
114
115
116
117
118
119
120
121
# File 'lib/fiber_job/queue.rb', line 112

def self.pop(queue_name, timeout: 0.1, redis_conn: nil)
  data = if redis_conn
           # Use provided connection (for legacy compatibility)
           redis_conn.call("BRPOP", "queue:#{queue_name}", timeout)
         else
           # Use connection pool for better performance
           redis_pool.with { |redis| redis.call("BRPOP", "queue:#{queue_name}", timeout) }
         end
  data ? JSON.parse(data[1]) : nil
end

.push(queue_name, payload) ⇒ Integer

Adds a job to the specified queue for immediate processing. Jobs are added to the left side of the list (LPUSH) and processed from the right side (BRPOP) implementing FIFO behavior.

Examples:

Push a job

payload = { 'class' => 'EmailJob', 'args' => [123, 'message'] }
FiberJob::Queue.push(:default, payload)


80
81
82
# File 'lib/fiber_job/queue.rb', line 80

def self.push(queue_name, payload)
  redis_pool.with { |redis| redis.call("LPUSH", "queue:#{queue_name}", JSON.dump(payload)) }
end

.push_priority(queue_name, payload) ⇒ Integer

Adds a job to the head of the queue for priority processing. Priority jobs are processed before regular jobs in the same queue.

Examples:

Push priority job

FiberJob::Queue.push_priority(:default, urgent_job_data)


93
94
95
96
# File 'lib/fiber_job/queue.rb', line 93

def self.push_priority(queue_name, payload)
  # Add to the head of the queue for priority execution
  redis_pool.with { |redis| redis.call("RPUSH", "queue:#{queue_name}", JSON.dump(payload)) }
end

.redisRedisClient

Deprecated.

Use redis_pool.with { |redis| … } instead for better performance

Returns the shared Redis connection instance. Creates a new connection if one doesn’t exist.



55
56
57
# File 'lib/fiber_job/queue.rb', line 55

def self.redis
  @redis ||= RedisClient.new(url: config.redis_url)
end

.redis_connectionRedisClient

Deprecated.

Use redis_pool.with { |redis| … } instead for better performance

Creates a new Redis connection for fiber-safe operations. Used when concurrent operations need separate connections.



64
65
66
67
# File 'lib/fiber_job/queue.rb', line 64

def self.redis_connection
  # Create a new Redis connection for fiber-safe operations
  RedisClient.new(url: config.redis_url)
end

.redis_poolConnectionPool

Returns a Redis connection from the pool. This is the preferred method for getting Redis connections as it provides connection pooling and thread/fiber safety.



44
45
46
47
48
# File 'lib/fiber_job/queue.rb', line 44

def self.redis_pool
  @redis_pool ||= ConnectionPool.new(size: config.pool_size, timeout: 5) do
    RedisClient.new(url: config.redis_url)
  end
end

.schedule(queue_name, payload, scheduled_at) ⇒ Boolean

Schedules a job for execution at a specific time. Uses Redis sorted sets with timestamp as score for efficient time-based retrieval.

Examples:

Schedule a job

future_time = Time.now.to_f + 3600  # 1 hour from now
FiberJob::Queue.schedule(:default, job_data, future_time)


135
136
137
# File 'lib/fiber_job/queue.rb', line 135

def self.schedule(queue_name, payload, scheduled_at)
  redis_pool.with { |redis| redis.call("ZADD", "schedule:#{queue_name}", scheduled_at, JSON.dump(payload)) }
end

.schedule_priority(queue_name, payload, scheduled_at) ⇒ Boolean

Schedules a job with priority for execution at a specific time. Priority scheduled jobs are moved to the head of the queue when ready.



146
147
148
149
150
# File 'lib/fiber_job/queue.rb', line 146

def self.schedule_priority(queue_name, payload, scheduled_at)
  # Mark as priority retry for head-of-queue execution
  priority_payload = payload.merge('priority_retry' => true)
  redis_pool.with { |redis| redis.call("ZADD", "schedule:#{queue_name}", scheduled_at, JSON.dump(priority_payload)) }
end

.scheduled_jobs(queue_name) ⇒ Integer

Processes scheduled jobs that are ready for execution. Moves jobs from the scheduled set to the appropriate queue when their scheduled time has arrived. Uses Redis pipeline for better performance when processing multiple jobs.

Examples:

Process scheduled jobs

moved_count = FiberJob::Queue.scheduled_jobs(:default)  # Called by workers


162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/fiber_job/queue.rb', line 162

def self.scheduled_jobs(queue_name)
  redis_pool.with do |redis|
    now = Time.now.to_f
    jobs = redis.call("ZRANGEBYSCORE", "schedule:#{queue_name}", 0, now)

    return 0 if jobs.empty?

    # Use pipeline for better performance with multiple operations
    redis.pipelined do |pipeline|
      jobs.each do |job_json|
        pipeline.call("ZREM", "schedule:#{queue_name}", job_json)
        job_data = JSON.parse(job_json)

        # Use priority queue for retries
        if job_data['priority_retry']
          job_data.delete('priority_retry') # Clean up the flag
          pipeline.call("RPUSH", "queue:#{queue_name}", JSON.dump(job_data))
        else
          pipeline.call("LPUSH", "queue:#{queue_name}", JSON.dump(job_data))
        end
      end
    end

    jobs.size
  end
end

.stats(queue_name) ⇒ Hash

Returns statistics for the specified queue. Provides insight into queue depth, scheduled jobs, and processing status. Uses pipeline for efficient batch operations.

Examples:

Get queue statistics

stats = FiberJob::Queue.stats(:default)
puts "Pending: #{stats[:size]}, Scheduled: #{stats[:scheduled]}"


199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/fiber_job/queue.rb', line 199

def self.stats(queue_name)
  redis_pool.with do |redis|
    results = redis.pipelined do |pipeline|
      pipeline.call("LLEN", "queue:#{queue_name}")
      pipeline.call("ZCARD", "schedule:#{queue_name}")
      pipeline.call("GET", "processing:#{queue_name}")
    end

    {
      size: results[0],
      scheduled: results[1],
      processing: results[2].to_i
    }
  end
end

.store_failed_job(job_data, error) ⇒ Integer

Stores a failed job with error information for later analysis. Failed jobs are stored with original job data plus failure metadata.

Examples:

Store failed job

begin
  job.perform
rescue => e
  FiberJob::Queue.store_failed_job(job_data, e)
end


228
229
230
231
232
233
234
235
# File 'lib/fiber_job/queue.rb', line 228

def self.store_failed_job(job_data, error)
  failed_job_data = job_data.merge({
                                     'failed_at' => Time.now.to_f,
                                     'error' => error.message,
                                     'backtrace' => error.backtrace&.first(10)
                                   })
  redis_pool.with { |redis| redis.call("LPUSH", 'failed', JSON.dump(failed_job_data)) }
end