Class: Queuel::Base::Queue
- Inherits:
-
Object
- Object
- Queuel::Base::Queue
show all
- Extended by:
- Introspect
- Defined in:
- lib/queuel/base/queue.rb
Instance Method Summary
collapse
Methods included from Introspect
const_with_nesting, module_names
Constructor Details
#initialize(client, queue_name) ⇒ Queue
Returns a new instance of Queue.
6
7
8
9
|
# File 'lib/queuel/base/queue.rb', line 6
def initialize(client, queue_name)
self.client = client
self.name = queue_name
end
|
Instance Method Details
#max_pool_tasks ⇒ Object
39
40
41
|
# File 'lib/queuel/base/queue.rb', line 39
def max_pool_tasks
Queuel.max_pool_tasks || nil
end
|
#peek(options = {}) ⇒ Object
11
12
13
|
# File 'lib/queuel/base/queue.rb', line 11
def peek(options = {})
raise NotImplementedError, "must implement #peek"
end
|
#pop(options = {}, &block) ⇒ Object
19
20
21
22
23
24
25
26
27
28
29
|
# File 'lib/queuel/base/queue.rb', line 19
def pop(options = {}, &block)
message_options, engine_options = Queuel::Hash.new(options).partition { |(k,_)| message_option_keys.include? k.to_s }
bare_message = pop_bare_message(engine_options)
unless bare_message.nil?
build_new_message(bare_message, message_options).tap { |message|
if block_given? && message.present?
message.delete if yield(message)
end
}
end
end
|
#push(message, options = {}) ⇒ Object
15
16
17
|
# File 'lib/queuel/base/queue.rb', line 15
def push(message, options = {})
raise NotImplementedError, "must implement #push"
end
|
#receive(options = {}, &block) ⇒ Object
31
32
33
|
# File 'lib/queuel/base/queue.rb', line 31
def receive(options = {}, &block)
poller_klass.new(self, block, options, thread_count).poll
end
|
#size ⇒ Object
35
36
37
|
# File 'lib/queuel/base/queue.rb', line 35
def size
raise NotImplementedError, "must implement #size"
end
|