Class: MemcacheQueue
- Inherits:
-
Object
- Object
- MemcacheQueue
- Defined in:
- lib/memcache_queue.rb
Overview
It’s your job to catch any signals relating to process termination (INT, TERM, etc.) and call the shutdown method on each worker. If this isn’t done you run the risk of losing messages.
Defined Under Namespace
Classes: ShutdownError
Instance Attribute Summary collapse
-
#add_timeout ⇒ Object
Returns the value of attribute add_timeout.
Instance Method Summary collapse
-
#add_msg(msg) ⇒ Object
! lost message potential ! If it takes more than add_timeout seconds from the time incr returns to the time the message is add()ed to memcache the message could be lost.
- #create_queue ⇒ Object
-
#get_msg ⇒ Object
! lost message potential ! If we die between incr and state(:set) the message will be lost.
-
#initialize(name, *args) ⇒ MemcacheQueue
constructor
A new instance of MemcacheQueue.
- #msgs_left(neg_value = false) ⇒ Object
- #shutdown ⇒ Object
Constructor Details
#initialize(name, *args) ⇒ MemcacheQueue
Returns a new instance of MemcacheQueue.
12 13 14 15 16 17 |
# File 'lib/memcache_queue.rb', line 12 def initialize(name, *args) @name = name or raise ArgumentError, "Must specify a unique name for each worker" @client = MemCache.new *args @first_key = state(:get, name) @add_timeout = 10 end |
Instance Attribute Details
#add_timeout ⇒ Object
Returns the value of attribute add_timeout.
10 11 12 |
# File 'lib/memcache_queue.rb', line 10 def add_timeout @add_timeout end |
Instance Method Details
#add_msg(msg) ⇒ Object
! lost message potential ! If it takes more than add_timeout seconds from the time incr returns to the time the message is add()ed to memcache the message could be lost.
28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/memcache_queue.rb', line 28 def add_msg(msg) raise ShutdownError if @shutdown begin latest_msg ||= @client.incr('latest_added') @client.add(latest_msg.to_s, msg) @client.set("added_#{latest_msg}", true) rescue Exception warn "Error on add: #{$!.inspect}, retrying" sleep 1 retry end end |
#create_queue ⇒ Object
19 20 21 22 |
# File 'lib/memcache_queue.rb', line 19 def create_queue @client.add('latest_added', '0', 0, true) @client.add('latest_read', '0', 0, true) end |
#get_msg ⇒ Object
! lost message potential ! If we die between incr and state(:set) the message will be lost. This would have to be due to hard process kill or other like events.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/memcache_queue.rb', line 45 def get_msg raise ShutdownError if @shutdown if key = @first_key @first_key = nil else key = @client.incr('latest_read').to_s end start_time = Time.now loop do begin if msg = @client.get(key) begin @client.delete(key) rescue Exception warn "Error while deleting key #{$!.inspect}" end return msg end rescue Exception warn "Error on get, #{$!.inspect}, retrying" end state(:set, key) unless @state_set raise ShutdownError if @shutdown return nil if (Time.now - start_time) > @add_timeout and failed_add?(key) sleep 1 start_time = Time.now if msgs_left(true) < 0 end ensure state(:delete, key) if @state_set and ! @shutdown end |
#msgs_left(neg_value = false) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/memcache_queue.rb', line 78 def msgs_left(neg_value = false) latest_added = @client.get('latest_added', true).to_i latest_read = @client.get('latest_read', true).to_i # check for 64bit boundary crossing if latest_added > 2**63 and latest_read < 2**62 latest_read += 2**64 elsif latest_read > 2**63 and latest_added < 2**62 latest_added += 2**64 end diff = latest_added - latest_read neg_value ? diff : [0, diff].max end |
#shutdown ⇒ Object
93 94 95 |
# File 'lib/memcache_queue.rb', line 93 def shutdown @shutdown = true end |