Class: Queuel::Base::Queue

Inherits:
Object
  • Object
show all
Extended by:
Introspect
Defined in:
lib/queuel/base/queue.rb

Direct Known Subclasses

IronMq::Queue, Null::Queue, SNS::Queue, SQS::Queue

Instance Method Summary collapse

Methods included from Introspect

const_with_nesting, module_names

Constructor Details

#initialize(client, queue_name) ⇒ Queue

Returns a new instance of Queue.



6
7
8
9
# File 'lib/queuel/base/queue.rb', line 6

def initialize(client, queue_name)
  self.client = client
  self.name = queue_name
end

Instance Method Details

#max_pool_tasksObject



39
40
41
# File 'lib/queuel/base/queue.rb', line 39

def max_pool_tasks
  Queuel.max_pool_tasks || nil
end

#peek(options = {}) ⇒ Object

Raises:

  • (NotImplementedError)


11
12
13
# File 'lib/queuel/base/queue.rb', line 11

def peek(options = {})
  raise NotImplementedError, "must implement #peek"
end

#pop(options = {}, &block) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
# File 'lib/queuel/base/queue.rb', line 19

def pop(options = {}, &block)
  message_options, engine_options = Queuel::Hash.new(options).partition { |(k,_)| message_option_keys.include? k.to_s }
  bare_message = pop_bare_message(engine_options)
  unless bare_message.nil?
    build_new_message(bare_message, message_options).tap { |message|
      if block_given? && message.present?
        message.delete if yield(message)
      end
    }
  end
end

#push(message, options = {}) ⇒ Object

Raises:

  • (NotImplementedError)


15
16
17
# File 'lib/queuel/base/queue.rb', line 15

def push(message, options = {})
  raise NotImplementedError, "must implement #push"
end

#receive(options = {}, &block) ⇒ Object



31
32
33
# File 'lib/queuel/base/queue.rb', line 31

def receive(options = {}, &block)
  poller_klass.new(self, block, options, thread_count).poll
end

#sizeObject

Raises:

  • (NotImplementedError)


35
36
37
# File 'lib/queuel/base/queue.rb', line 35

def size
  raise NotImplementedError, "must implement #size"
end