Class: MemcacheQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/memcache_queue.rb

Overview

It’s your job to catch any signals relating to process termination (INT, TERM, etc.) and call the shutdown method on each worker. If this isn’t done you run the risk of losing messages.

Defined Under Namespace

Classes: ShutdownError

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, *args) ⇒ MemcacheQueue

Returns a new instance of MemcacheQueue.



12
13
14
15
16
17
# File 'lib/memcache_queue.rb', line 12

def initialize(name, *args)
  @name = name or raise ArgumentError, "Must specify a unique name for each worker"
  @client = MemCache.new *args
  @first_key = state(:get, name)
  @add_timeout = 10
end

Instance Attribute Details

#add_timeoutObject

Returns the value of attribute add_timeout.



10
11
12
# File 'lib/memcache_queue.rb', line 10

def add_timeout
  @add_timeout
end

Instance Method Details

#add_msg(msg) ⇒ Object

! lost message potential ! If it takes more than add_timeout seconds from the time incr returns to the time the message is add()ed to memcache the message could be lost.

Raises:



28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/memcache_queue.rb', line 28

def add_msg(msg)
  raise ShutdownError  if @shutdown

  begin
    latest_msg ||= @client.incr('latest_added')
    @client.add(latest_msg.to_s, msg)
    @client.set("added_#{latest_msg}", true)
  rescue Exception
    warn "Error on add: #{$!.inspect}, retrying"
    sleep 1
    retry
  end
end

#create_queueObject



19
20
21
22
# File 'lib/memcache_queue.rb', line 19

def create_queue
  @client.add('latest_added', '0', 0, true)
  @client.add('latest_read', '0', 0, true)
end

#get_msgObject

! lost message potential ! If we die between incr and state(:set) the message will be lost. This would have to be due to hard process kill or other like events.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/memcache_queue.rb', line 45

def get_msg
  raise ShutdownError  if @shutdown

  if key = @first_key
    @first_key = nil
  else
    key = @client.incr('latest_read').to_s
  end

  start_time = Time.now
  loop do
    begin
      if msg = @client.get(key)
        begin
          @client.delete(key)
        rescue Exception
          warn "Error while deleting key #{$!.inspect}"
        end
        return msg
      end
    rescue Exception
      warn "Error on get, #{$!.inspect}, retrying"
    end
    state(:set, key)  unless @state_set
    raise ShutdownError  if @shutdown
    return nil  if (Time.now - start_time) > @add_timeout and failed_add?(key)
    sleep 1
    start_time = Time.now  if msgs_left(true) < 0
  end
ensure
  state(:delete, key)  if @state_set and ! @shutdown
end

#msgs_left(neg_value = false) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/memcache_queue.rb', line 78

def msgs_left(neg_value = false)
  latest_added = @client.get('latest_added', true).to_i
  latest_read = @client.get('latest_read', true).to_i

  # check for 64bit boundary crossing
  if latest_added > 2**63 and latest_read < 2**62
    latest_read += 2**64
  elsif latest_read > 2**63 and latest_added < 2**62
    latest_added += 2**64
  end

  diff = latest_added - latest_read
  neg_value ? diff : [0, diff].max
end

#shutdownObject



93
94
95
# File 'lib/memcache_queue.rb', line 93

def shutdown
  @shutdown = true
end