Class: KueRuby

Inherits:
Object
  • Object
show all
Defined in:
lib/kue_ruby.rb,
lib/kue_ruby/version.rb

Overview

Interface with the Automattic Kue redis store

Defined Under Namespace

Classes: KueJob

Constant Summary collapse

VERSION =
'0.3.0'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ KueRuby

Create a new client instance

Parameters:

  • Hash

    options

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • Redis (Object)

    :redis an instance of ‘redis`

  • :prefix (String)

    namespace in redis (default is q)

Raises:

  • (ArgumentError)


16
17
18
19
20
21
# File 'lib/kue_ruby.rb', line 16

def initialize(options = {})
  raise(ArgumentError, ':redis Redis required', caller) unless options[:redis]
  @redis = options[:redis]
  @prefix = options[:prefix] ? options[:prefix] : 'q'
  super()
end

Instance Attribute Details

#prefixObject (readonly)

Returns the value of attribute prefix.



7
8
9
# File 'lib/kue_ruby.rb', line 7

def prefix
  @prefix
end

#redisObject (readonly)

Returns the value of attribute redis.



7
8
9
# File 'lib/kue_ruby.rb', line 7

def redis
  @redis
end

Instance Method Details

#create_fifo(id = 1) ⇒ Object

Create FIFO id for zset to preserve order

Parameters:

  • Integer

    id

Returns:

  • String



28
29
30
31
32
33
34
35
36
# File 'lib/kue_ruby.rb', line 28

def create_fifo(id = 1)
  id_len = id.to_s.length.to_s
  len = 2 - id_len.length
  while len > 0
    id_len = '0' + id_len
    len -= 1
  end
  id_len.to_s + '|' + id.to_s
end

#create_job(options = {}) ⇒ KueJob

Enqueue a job

Parameters:

  • Hash

    options

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • String (Object)

    :type name of the queue for the job

  • Hash (Object)

    :data hash of job data

  • :max_attempts (Integer)

    max attempts for the job

  • :priority (Integer)

    default is 0/normal

Returns:

Raises:

  • (ArgumentError)


47
48
49
50
51
52
53
54
55
# File 'lib/kue_ruby.rb', line 47

def create_job(options = {})
  raise(ArgumentError, ':type String required', caller) unless options[:type]
  raise(ArgumentError, ':data Hash required', caller) unless options[:data]
  begin
    return create_job! options
  rescue
    return nil
  end
end

#create_job!(options = {}) ⇒ Object

Enqueue a job

Parameters:

  • Hash

    options

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • String (Object)

    :type name of the queue for the job

  • Hash (Object)

    :data hash of job data

  • :max_attempts (Integer)

    max attempts for the job

  • :priority (Integer)

    default is 0/normal

Returns:

  • KueJob a new kue job, throwing on exception

Raises:

  • (ArgumentError)


66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/kue_ruby.rb', line 66

def create_job!(options = {})
  raise(ArgumentError, ':type String required', caller) unless options[:type]
  raise(ArgumentError, ':data Hash required', caller) unless options[:data]
  job = KueJob.new
  job.type = options[:type]
  job.data = options[:data]
  job.priority = options[:priority] ? options[:priority] : 0
  job.max_attempts = options[:max_attempts] ? options[:max_attempts] : 1
  job.state = 'inactive'
  job.created_at = Time.now
  job.backoff = { delay: 60 * 1000, type: 'exponential' }
  job.id = @redis.incr "#{@prefix}.ids"
  job.zid = create_fifo job.id
  @redis.sadd "#{@prefix}:job:types", job.type
  job.save! self
  @redis.zadd("#{@prefix}:jobs", job.priority, job.zid)
  @redis.zadd("#{@prefix}:jobs:inactive", job.priority, job.zid)
  @redis.zadd("#{@prefix}:jobs:#{job.type}:inactive", job.priority, job.zid)
  @redis.lpush("#{@prefix}:#{job.type}:jobs", 1)
  job
end