Class: Superbolt::Queue
- Inherits:
-
Object
- Object
- Superbolt::Queue
- Defined in:
- lib/superbolt/queue.rb
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
- #all ⇒ Object
- #clear ⇒ Object
- #connection ⇒ Object
- #delete ⇒ Object
-
#initialize(name, config = nil) ⇒ Queue
constructor
A new instance of Queue.
- #peek ⇒ Object
- #pop ⇒ Object
- #push(message) ⇒ Object
-
#read ⇒ Object
TODO: roll up some of these subscribe methods.
- #size ⇒ Object
Constructor Details
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
3 4 5 |
# File 'lib/superbolt/queue.rb', line 3 def config @config end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
3 4 5 |
# File 'lib/superbolt/queue.rb', line 3 def name @name end |
Instance Method Details
#all ⇒ Object
49 50 51 |
# File 'lib/superbolt/queue.rb', line 49 def all read.map(&:parse) end |
#clear ⇒ Object
30 31 32 33 34 |
# File 'lib/superbolt/queue.rb', line 30 def clear closing do q.purge end end |
#connection ⇒ Object
10 11 12 |
# File 'lib/superbolt/queue.rb', line 10 def connection @connection ||= Connection::Queue.new(name, config) end |
#delete ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/superbolt/queue.rb', line 71 def delete = [] closing do q.subscribe(:ack => true) do |delivery_info, , payload| = IncomingMessage.new(delivery_info, payload, channel) relevant = yield(.parse) if relevant << .parse .ack end end # channel is closed by block before message ack can complete # therefore we must sleep :( sleep 0.02 end end |
#peek ⇒ Object
53 54 55 56 57 |
# File 'lib/superbolt/queue.rb', line 53 def peek = pop push() end |
#pop ⇒ Object
59 60 61 62 63 64 65 66 |
# File 'lib/superbolt/queue.rb', line 59 def pop closing do q.pop do |delivery_info, , | = IncomingMessage.new(delivery_info, , channel) && .parse end end end |
#push(message) ⇒ Object
18 19 20 21 22 |
# File 'lib/superbolt/queue.rb', line 18 def push() closing do writer.publish(.to_json, routing_key: name) end end |
#read ⇒ Object
TODO: roll up some of these subscribe methods
38 39 40 41 42 43 44 45 46 47 |
# File 'lib/superbolt/queue.rb', line 38 def read = [] closing do q.subscribe(:ack => true) do |delivery_info, , payload| = IncomingMessage.new(delivery_info, payload, channel) << end end end |
#size ⇒ Object
24 25 26 27 28 |
# File 'lib/superbolt/queue.rb', line 24 def size closing do q. end end |