Class: Jober::Queue

Inherits:
Task show all
Defined in:
lib/jober/queue.rb

Direct Known Subclasses

QueueBatch, SlowQueue, UniqueQueue

Class Attribute Summary collapse

Attributes inherited from AbstractTask

#finished, #stopped, #unique_id, #worker_id, #workers_count

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Task

extract_name, #perform

Methods inherited from AbstractTask

#after_execute, #after_initialize, #before_execute, #execute, get_interval, get_workers, #initialize, interval, manual!, manual?, #on_crashed, #run_loop, #sleeping, #stop!, workers

Methods included from Exception

#catch, #exception

Methods included from Logger

#logger, #logger=, #logger_tag

Constructor Details

This class inherits a constructor from Jober::AbstractTask

Class Attribute Details

.queue_nameObject

Returns the value of attribute queue_name.



9
10
11
# File 'lib/jober/queue.rb', line 9

def queue_name
  @queue_name
end

.queue_name_baseObject

Returns the value of attribute queue_name_base.



9
10
11
# File 'lib/jober/queue.rb', line 9

def queue_name_base
  @queue_name_base
end

Class Method Details

.dequeue(*args) ⇒ Object



25
26
27
# File 'lib/jober/queue.rb', line 25

def self.dequeue(*args)
  Jober.redis.lpush(queue_name, Jober.dump_args(*args))
end

.enqueue(*args) ⇒ Object



21
22
23
# File 'lib/jober/queue.rb', line 21

def self.enqueue(*args)
  Jober.redis.rpush(queue_name, Jober.dump_args(*args))
end

.inherited(base) ⇒ Object



3
4
5
6
# File 'lib/jober/queue.rb', line 3

def self.inherited(base)
  super
  base.set_queue_name(base.short_name)
end

.lenObject



29
30
31
# File 'lib/jober/queue.rb', line 29

def self.len
  Jober.redis.llen(self.queue_name)
end

.set_queue_name(q) ⇒ Object



11
12
13
14
# File 'lib/jober/queue.rb', line 11

def set_queue_name(q)
  @queue_name_base = q
  @queue_name = Jober.key("queue:#{q}")
end

Instance Method Details

#lenObject



33
34
35
# File 'lib/jober/queue.rb', line 33

def len
  self.class.len
end

#popObject



37
38
39
40
# File 'lib/jober/queue.rb', line 37

def pop
  res = Jober.redis.lpop(queue_name)
  Jober.load(res) if res
end

#queue_nameObject



17
18
19
# File 'lib/jober/queue.rb', line 17

def queue_name
  self.class.queue_name
end

#retry_eventObject



57
58
59
# File 'lib/jober/queue.rb', line 57

def retry_event
  self.class.dequeue(*@args) if @args
end

#retry_event_laterObject



61
62
63
# File 'lib/jober/queue.rb', line 61

def retry_event_later
  self.class.enqueue(*@args) if @args
end

#runObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/jober/queue.rb', line 42

def run
  cnt = 0
  while @args = pop
    perform(*@args)
    cnt += 1

    if stopped
      break
    end

    info { "processed #{cnt}" } if cnt % 1000 == 0
  end
  info { "processed total #{cnt}" }
end