Class: Cola::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/cola/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Queue

Returns a new instance of Queue.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/cola/queue.rb', line 8

def initialize(options = {})
	@redis = options[:redis] || Redis.current
	prefix = options[:prefix] || "rq_"
	@deadletter = options[:deadletter] # default is false
	if options[:queue_name] 
		@queue_name = prefix + options[:queue_name]
		@process_queue_name = prefix + "process_" + options[:queue_name]
		@deadletter_queue_name = prefix + "deadletter_" + options[:queue_name]
	else
		now = Time.new.to_i
		@queue_name = prefix + now.to_s
		@process_queue_name = prefix + "process_" + now.to_s
		@deadletter_queue_name = prefix + "deadletter_" + now.to_s
	end

	@timeout = options[:timeout] ||= 0
	@retries = options[:retries] ||= 0
end

Instance Attribute Details

#deadletter_queue_nameObject (readonly)

Returns the value of attribute deadletter_queue_name.



6
7
8
# File 'lib/cola/queue.rb', line 6

def deadletter_queue_name
  @deadletter_queue_name
end

#process_queue_nameObject (readonly)

Returns the value of attribute process_queue_name.



5
6
7
# File 'lib/cola/queue.rb', line 5

def process_queue_name
  @process_queue_name
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



4
5
6
# File 'lib/cola/queue.rb', line 4

def queue_name
  @queue_name
end

Instance Method Details

#commitObject



80
81
82
83
# File 'lib/cola/queue.rb', line 80

def commit
	@redis.lpop(@process_queue_name)
	return true 
end

#deadletter_countObject



58
59
60
# File 'lib/cola/queue.rb', line 58

def deadletter_count 
	@redis.llen @deadletter_queue_name
end

#destroyObject

not thread safe



44
45
46
47
48
# File 'lib/cola/queue.rb', line 44

def destroy
	flush_processing
	flush
	flush_deadletter
end

#empty?Boolean

Returns:

  • (Boolean)


50
51
52
# File 'lib/cola/queue.rb', line 50

def empty?
	len <= 0
end

#flushObject



35
36
37
# File 'lib/cola/queue.rb', line 35

def flush
	@redis.del @queue_name
end

#flush_deadletterObject



39
40
41
# File 'lib/cola/queue.rb', line 39

def flush_deadletter
	@redis.del @deadletter_queue_name
end

#flush_processingObject



31
32
33
# File 'lib/cola/queue.rb', line 31

def flush_processing
	@redis.del @process_queue_name
end

#lenObject



27
28
29
# File 'lib/cola/queue.rb', line 27

def len
	@redis.llen @queue_name
end

#pop(non_block = false, timeout: @timeout) ⇒ Object



73
74
75
76
77
78
# File 'lib/cola/queue.rb', line 73

def pop(non_block = false, timeout: @timeout)
	obj = pop_with_envelope(non_block, timeout: timeout)	
	return nil if obj.nil?

	return obj.message 
end

#pop_with_envelope(non_block = false, timeout: @timeout) ⇒ Object



119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/cola/queue.rb', line 119

def pop_with_envelope(non_block = false, timeout: @timeout)
	obj = non_block ? @redis.rpoplpush(@queue_name, @process_queue_name) : @redis.brpoplpush(@queue_name, @process_queue_name, timeout)

	env = Cola::Envelope.from_payload(obj)
	return env if env.nil? 

	if env.expired? 
		commit
		return nil
	else
		return env
	end
end

#process(non_block = false, timeout: @timeout) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/cola/queue.rb', line 85

def process(non_block = false, timeout: @timeout)
	loop do
		obj = pop_with_envelope(non_block, timeout: timeout)
		ret = yield obj.message if !obj.nil? && block_given?
		commit if ret
		break if obj.nil? || (non_block && empty?)
	rescue => exc 
		raise if obj.nil? 
		# requeue if we should retry and it's not done 
		if @retries != 0
			if obj.retries < @retries
				obj.inc_retries(reason: exc.message)
				push(obj)
			else 
				mark_as_deadletter(obj)
				raise Cola::RetryError.new(obj, exc)
			end
		else 
			mark_as_deadletter(obj)
			raise 
		end
	end
end

#processing_countObject



54
55
56
# File 'lib/cola/queue.rb', line 54

def processing_count 
	@redis.llen @process_queue_name
end

#push(obj, ttl: 0) ⇒ Object Also known as: <<



62
63
64
65
66
67
68
69
70
71
# File 'lib/cola/queue.rb', line 62

def push(obj, ttl: 0)
	if obj.is_a? Cola::Envelope
		wrapped = obj
	else 				
		wrapped = Cola::Envelope.new(obj)
		wrapped.ttl = ttl
	end

	@redis.lpush(@queue_name, wrapped.to_json)
end

#refillObject

not thread safe



110
111
112
113
114
115
# File 'lib/cola/queue.rb', line 110

def refill
	while (obj = @redis.lpop(@process_queue_name))
		@redis.rpush(@queue_name, obj)
	end
	true
end