Class: Qu::Backend::Memory

Inherits:
Base
  • Object
show all
Defined in:
lib/qu/backend/memory.rb

Instance Method Summary collapse

Constructor Details

#initializeMemory

Returns a new instance of Memory.



7
8
9
10
# File 'lib/qu/backend/memory.rb', line 7

def initialize
  @workers  = Hash.new
  @queues   = Hash.new {|h,k| h[k] = (k == "failed" ? {} : [])}
end

Instance Method Details

#clear(queue = nil) ⇒ Object



24
25
26
27
28
29
30
31
# File 'lib/qu/backend/memory.rb', line 24

def clear(queue = nil)
  queue ||= queues + ['failed']
  logger.info { "Clearing queues: #{queue.inspect}" }
  Array(queue).each do |q|
    logger.debug "Clearing queue #{q}"
    @queues.delete(q)
  end
end

#clear_workersObject



43
44
45
# File 'lib/qu/backend/memory.rb', line 43

def clear_workers
  @workers = Hash.new
end

#completed(payload) ⇒ Object



87
# File 'lib/qu/backend/memory.rb', line 87

def completed(payload); end

#connectionObject



12
13
14
# File 'lib/qu/backend/memory.rb', line 12

def connection
  @connection ||= true
end

#enqueue(payload) ⇒ Object



53
54
55
56
57
58
# File 'lib/qu/backend/memory.rb', line 53

def enqueue(payload)
  payload.id = SimpleUUID::UUID.new.to_guid
  @queues[payload.queue] << payload
  logger.debug { "Enqueued job #{payload}" }
  payload
end

#failed(payload, error) ⇒ Object



69
70
71
# File 'lib/qu/backend/memory.rb', line 69

def failed(payload, error)
  @queues["failed"][payload.id] = payload
end

#get_queue_by_klass(klass) ⇒ Object



93
94
95
96
# File 'lib/qu/backend/memory.rb', line 93

def get_queue_by_klass(klass)
  payload = Payload.new(:klass => klass)
  get_queue_by_name(payload.queue)
end

#get_queue_by_name(queue = 'default') ⇒ Object



89
90
91
# File 'lib/qu/backend/memory.rb', line 89

def get_queue_by_name(queue = 'default')
  @queues[queue.to_s]
end

#length(queue = 'default') ⇒ Object



20
21
22
# File 'lib/qu/backend/memory.rb', line 20

def length(queue = 'default')
  @queues[queue].length
end

#queuesObject



16
17
18
# File 'lib/qu/backend/memory.rb', line 16

def queues
  @queues.keys.reject { |queue| @queues[queue].empty? || queue == "failed" }
end

#register_worker(worker) ⇒ Object



33
34
35
36
# File 'lib/qu/backend/memory.rb', line 33

def register_worker(worker)
  logger.debug "Registering worker #{worker.id}"
  @workers[worker.id] = worker.attributes.merge(:id => worker.id)
end

#release(payload) ⇒ Object



83
84
85
# File 'lib/qu/backend/memory.rb', line 83

def release(payload)
  @queues[payload.queue] << payload
end

#requeue(id) ⇒ Object



73
74
75
76
77
78
79
80
81
# File 'lib/qu/backend/memory.rb', line 73

def requeue(id)
  logger.debug "Requeuing job #{id}"
  if payload = @queues["failed"].delete(id)
    @queues[payload.queue] << payload
    payload
  else
    false
  end
end

#reserve(worker, options = {:block => true}) ⇒ Object



60
61
62
63
64
65
66
67
# File 'lib/qu/backend/memory.rb', line 60

def reserve(worker, options = {:block => true})
  loop do
    worker.queues.each do |queue|
      logger.debug { "Reserving job in queue #{queue}" }
      return @queues[queue].shift unless @queues[queue].empty?
    end
  end
end

#unregister_worker(worker) ⇒ Object



38
39
40
41
# File 'lib/qu/backend/memory.rb', line 38

def unregister_worker(worker)
  logger.debug "Unregistering worker #{worker.id}"
  @workers.delete(worker.id)
end

#workersObject



47
48
49
50
51
# File 'lib/qu/backend/memory.rb', line 47

def workers
  @workers.map do |w|
    Qu::Worker.new(w)
  end
end