Module: Resque::Integration::Ordered::Overrides

Defined in:
lib/resque/integration/ordered.rb

Instance Method Summary collapse

Instance Method Details

#enqueue(*args) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/resque/integration/ordered.rb', line 83

def enqueue(*args)
  meta = super

  if uniqueness && ordered_meta_id = uniqueness.ordered_meta_id(meta.meta_id, args)
    return get_meta(ordered_meta_id)
  end

  ordered_meta = ::Resque::Plugins::Meta::Metadata.new('meta_id' => ordered_meta_id(args), 'job_class' => self)
  ordered_meta.save

  uniqueness.set(meta.meta_id, args, ordered_meta.meta_id) if uniqueness
  args.unshift(ordered_meta.meta_id)
  encoded_args = ::Resque.encode(args)
  args_key = ordered_queue_key(meta.meta_id)

  ::Resque.redis.rpush(args_key, encoded_args)
  ::Resque.redis.expire(args_key, ARGS_EXPIRATION)

  ordered_meta
end

#perform(meta_id) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/resque/integration/ordered.rb', line 104

def perform(meta_id, *)
  args_key = ordered_queue_key(meta_id)
  i = 1
  while job_args = ::Resque.redis.lpop(args_key)
    job_args = ::Resque.decode(job_args)
    ordered_meta = get_meta(job_args.shift)
    ordered_meta.start!

    begin
      execute(ordered_meta, *job_args)
    rescue Exception
      ordered_meta.fail!
      raise
    ensure
      uniqueness.remove(meta_id, job_args) if uniqueness
    end

    ordered_meta.finish!

    i += 1
    return continue if max_iterations && i > max_iterations && ordered_queue_size(meta_id) > 0
  end
end