Class: Qyu::Queue::Memory::Adapter

Inherits:
Base
  • Object
show all
Defined in:
lib/qyu/queue/memory/adapter.rb

Overview

Qyu::Queue::Memory::Adapter

Constant Summary collapse

TYPE =
:memory

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#enqueue_tasks

Constructor Details

#initialize(_config) ⇒ Adapter

Returns a new instance of Adapter.



10
11
12
13
14
# File 'lib/qyu/queue/memory/adapter.rb', line 10

def initialize(_config)
  @temp_store = Hash.new(false)
  @queues = {}
  @threads = []
end

Class Method Details

.valid_config?(_config) ⇒ Boolean

Returns:

  • (Boolean)


16
17
18
19
# File 'lib/qyu/queue/memory/adapter.rb', line 16

def self.valid_config?(_config)
  # TODO
  true
end

Instance Method Details

#acknowledge_message(_queue_name, message_id) ⇒ Object



48
49
50
# File 'lib/qyu/queue/memory/adapter.rb', line 48

def acknowledge_message(_queue_name, message_id)
  @temp_store[message_id] = true
end

#enqueue_task(queue_name, task_id) ⇒ Object



21
22
23
# File 'lib/qyu/queue/memory/adapter.rb', line 21

def enqueue_task(queue_name, task_id)
  queue(queue_name) << { 'task_id' => task_id }
end

#enqueue_task_to_failed_queue(queue_name, task_id) ⇒ Object



25
26
27
28
# File 'lib/qyu/queue/memory/adapter.rb', line 25

def enqueue_task_to_failed_queue(queue_name, task_id)
  failed_queue_name = queue_name + '-failed'
  enqueue_task(failed_queue_name, task_id)
end

#fetch_next_message(queue_name) ⇒ Hash

fetch_next_message

TODO Note the uglyness in ‘while … empty?`; it’s because of reasons mainly for this (stackoverflow.com/q/11660253) reason.

Parameters:

  • queue_name (String)

Returns:

  • (Hash)

    the acknowledge message



37
38
39
40
41
42
43
44
45
46
# File 'lib/qyu/queue/memory/adapter.rb', line 37

def fetch_next_message(queue_name)
  sleep(1) while queue(queue_name).empty?
  message = queue(queue_name).pop(true)
  message_id = Qyu::Utils.uuid
  schedule_requeue(message, message_id, queue_name)
  {
    'id' => message_id,
    'task_id' => message['task_id']
  }
end

#queuesObject



52
53
54
55
56
# File 'lib/qyu/queue/memory/adapter.rb', line 52

def queues
  @queues.map do |name, queue|
    { name: name, messages: queue&.size }
  end
end

#size(queue_name) ⇒ Object



58
59
60
# File 'lib/qyu/queue/memory/adapter.rb', line 58

def size(queue_name)
  queue(queue_name).size
end