Class: ZK::MessageQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/zk/message_queue.rb

Overview

implements a simple message queue based on Zookeeper recipes these are good for low-volume queues only because of the way zookeeper works, all message titles have to be read into memory in order to see what message to process next

Examples:

queue = zk.queue("somequeue")
queue.publish(some_string)
queue.poll! # will return one message
#subscribe will handle messages as they come in
queue.subscribe do |title, data|
  #handle message
end

See Also:

Instance Method Summary collapse

Instance Method Details

#delete_message(message_title) ⇒ Object

you barely ever need to actually use this method but lets you remove a message from the queue by specifying its title

Parameters:

  • message_title (String)

    the title of the message to remove



51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/zk/message_queue.rb', line 51

def delete_message(message_title)
  full_path = "#{full_queue_path}/#{message_title}"
  locker = @zk.locker("#{full_queue_path}/#{message_title}")
  if locker.lock!
    begin
      @zk.delete(full_path)
      return true
    ensure
      locker.unlock!
    end
  else
    return false
  end
end

#destroy!Object

highly destructive method! WARNING! Will delete the queue and all messages in it



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/zk/message_queue.rb', line 103

def destroy!
  unsubscribe  # first thing, make sure we don't get any callbacks related to this
  children = @zk.children(full_queue_path)
  locks = []
  children.each do |path|
    lock = @zk.locker("#{full_queue_path}/#{path}")
    lock.lock!    # XXX(slyphon): should this be a blocking lock?
    locks << lock
  end
  children.each do |path|
    @zk.delete("#{full_queue_path}/#{path}") rescue ZK::Exceptions::NoNode
  end
  @zk.delete(full_queue_path) rescue ZK::Exceptions::NoNode
  locks.each do |lock|
    lock.unlock!
  end
end

#messagesObject

a list of the message titles in the queue



97
98
99
# File 'lib/zk/message_queue.rb', line 97

def messages
  @zk.children(full_queue_path)
end

#poll!Object

grab one message from the queue used when you don't want to or can't subscribe

See Also:

  • ZooKeeper::MessageQueue#subscribe


69
70
71
# File 'lib/zk/message_queue.rb', line 69

def poll!
  find_and_process_next_available(messages)
end

#publish(data, message_title = nil) ⇒ Object

publish a message to the queue, you can (optionally) use message titles to guarantee unique messages in the queue

Parameters:

  • data (String)
    • any arbitrary string value
  • optional (String)

    message_title - specify a unique message title for this message



35
36
37
38
39
40
41
42
43
44
45
# File 'lib/zk/message_queue.rb', line 35

def publish(data, message_title = nil)
  mode = :persistent_sequential
  if message_title
    mode = :persistent
  else
    message_title = "message"
  end
  @zk.create("#{full_queue_path}/#{message_title}", data, :mode => mode)
rescue KeeperException::NodeExists
  return false
end

#subscribe {|title, data| ... } ⇒ Object

Examples:

# subscribe like this:
subscribe {|title, data| handle_message!; true}
# returning true in the block deletes the message, false unlocks and requeues

Yields:

  • (title, data)

    yield to your block with the message title and the data of the message



79
80
81
82
83
84
85
86
# File 'lib/zk/message_queue.rb', line 79

def subscribe(&block)
  @subscription_block = block
  @sub = @zk.register(full_queue_path) do |event, zk|
    find_and_process_next_available(@zk.children(full_queue_path, :watch => true))
  end

  find_and_process_next_available(@zk.children(full_queue_path, :watch => true))
end

#unsubscribeObject

stop listening to this queue



89
90
91
92
93
94
# File 'lib/zk/message_queue.rb', line 89

def unsubscribe
  if @sub
    @sub.unsubscribe
    @sub = nil
  end
end