Class: ZK::MessageQueue
- Inherits:
-
Object
- Object
- ZK::MessageQueue
- Defined in:
- lib/zk/message_queue.rb
Overview
implements a simple message queue based on Zookeeper recipes these are good for low-volume queues only because of the way zookeeper works, all message titles have to be read into memory in order to see what message to process next
Instance Method Summary collapse
-
#delete_message(message_title) ⇒ Object
you barely ever need to actually use this method but lets you remove a message from the queue by specifying its title.
-
#destroy! ⇒ Object
highly destructive method! WARNING! Will delete the queue and all messages in it.
-
#messages ⇒ Object
a list of the message titles in the queue.
-
#poll! ⇒ Object
grab one message from the queue used when you don't want to or can't subscribe.
-
#publish(data, message_title = nil) ⇒ Object
publish a message to the queue, you can (optionally) use message titles to guarantee unique messages in the queue.
- #subscribe {|title, data| ... } ⇒ Object
-
#unsubscribe ⇒ Object
stop listening to this queue.
Instance Method Details
#delete_message(message_title) ⇒ Object
you barely ever need to actually use this method but lets you remove a message from the queue by specifying its title
51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/zk/message_queue.rb', line 51 def () full_path = "#{full_queue_path}/#{}" locker = @zk.locker("#{full_queue_path}/#{}") if locker.lock! begin @zk.delete(full_path) return true ensure locker.unlock! end else return false end end |
#destroy! ⇒ Object
highly destructive method! WARNING! Will delete the queue and all messages in it
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/zk/message_queue.rb', line 103 def destroy! unsubscribe # first thing, make sure we don't get any callbacks related to this children = @zk.children(full_queue_path) locks = [] children.each do |path| lock = @zk.locker("#{full_queue_path}/#{path}") lock.lock! # XXX(slyphon): should this be a blocking lock? locks << lock end children.each do |path| @zk.delete("#{full_queue_path}/#{path}") rescue ZK::Exceptions::NoNode end @zk.delete(full_queue_path) rescue ZK::Exceptions::NoNode locks.each do |lock| lock.unlock! end end |
#messages ⇒ Object
a list of the message titles in the queue
97 98 99 |
# File 'lib/zk/message_queue.rb', line 97 def @zk.children(full_queue_path) end |
#poll! ⇒ Object
grab one message from the queue used when you don't want to or can't subscribe
69 70 71 |
# File 'lib/zk/message_queue.rb', line 69 def poll! find_and_process_next_available() end |
#publish(data, message_title = nil) ⇒ Object
publish a message to the queue, you can (optionally) use message titles to guarantee unique messages in the queue
35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/zk/message_queue.rb', line 35 def publish(data, = nil) mode = :persistent_sequential if mode = :persistent else = "message" end @zk.create("#{full_queue_path}/#{}", data, :mode => mode) rescue KeeperException::NodeExists return false end |
#subscribe {|title, data| ... } ⇒ Object
79 80 81 82 83 84 85 86 |
# File 'lib/zk/message_queue.rb', line 79 def subscribe(&block) @subscription_block = block @sub = @zk.register(full_queue_path) do |event, zk| find_and_process_next_available(@zk.children(full_queue_path, :watch => true)) end find_and_process_next_available(@zk.children(full_queue_path, :watch => true)) end |
#unsubscribe ⇒ Object
stop listening to this queue
89 90 91 92 93 94 |
# File 'lib/zk/message_queue.rb', line 89 def unsubscribe if @sub @sub.unsubscribe @sub = nil end end |