Class: Norikra::OutputPool
- Inherits:
-
Object
- Object
- Norikra::OutputPool
- Defined in:
- lib/norikra/output_pool.rb
Instance Attribute Summary collapse
-
#pool ⇒ Object
Returns the value of attribute pool.
Instance Method Summary collapse
-
#fetch(query_name) ⇒ Object
returns [[time, event], …], but not remove from pool.
-
#initialize ⇒ OutputPool
constructor
A new instance of OutputPool.
-
#pop(query_name) ⇒ Object
returns [[time(int from epoch), event], …], event: hash.
-
#push(query_name, query_group, events) ⇒ Object
events must be [time(int), event_record].
- #queries ⇒ Object
- #remove(query_name, query_group) ⇒ Object
-
#sweep(group = nil) ⇒ Object
returns => [[time, event], …].
Constructor Details
#initialize ⇒ OutputPool
Returns a new instance of OutputPool.
7 8 9 10 11 |
# File 'lib/norikra/output_pool.rb', line 7 def initialize @pool = {} # { query_name => [events] } @groups = {} # { group_name => Set(query_names) } @mutex = Mutex.new end |
Instance Attribute Details
#pool ⇒ Object
Returns the value of attribute pool.
5 6 7 |
# File 'lib/norikra/output_pool.rb', line 5 def pool @pool end |
Instance Method Details
#fetch(query_name) ⇒ Object
returns [[time, event], …], but not remove from pool
46 47 48 49 50 51 |
# File 'lib/norikra/output_pool.rb', line 46 def fetch(query_name) events = @mutex.synchronize do @pool.fetch(query_name, []) end events.reduce(&:+) || [] end |
#pop(query_name) ⇒ Object
returns [[time(int from epoch), event], …], event: hash
54 55 56 57 58 59 |
# File 'lib/norikra/output_pool.rb', line 54 def pop(query_name) events = @mutex.synchronize do @pool.delete(query_name) || [] end events.reduce(&:+) || [] end |
#push(query_name, query_group, events) ⇒ Object
events must be [time(int), event_record]
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/norikra/output_pool.rb', line 29 def push(query_name, query_group, events) # events must be [time(int), event_record] # called with blank events for window leavings (and/or other situations) return if events.size < 1 @mutex.synchronize do if @groups[query_group] @groups[query_group].add(query_name) # Set is unique set of elements else @groups[query_group] ||= Set.new([query_name]) end @pool[query_name] ||= [] @pool[query_name].push(events) end end |
#queries ⇒ Object
13 14 15 |
# File 'lib/norikra/output_pool.rb', line 13 def queries @pool.keys end |
#remove(query_name, query_group) ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/norikra/output_pool.rb', line 17 def remove(query_name, query_group) @mutex.synchronize do group = @groups[query_group] if group group.delete(query_name) end @groups.delete(query_name) @pool.delete(query_name) end nil end |
#sweep(group = nil) ⇒ Object
returns => [[time, event], …]
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/norikra/output_pool.rb', line 62 def sweep(group=nil) return {} if @groups[group].nil? ret = {} sweep_pool = @mutex.synchronize do sweeped = {} @groups[group].each do |qname| sweeped[qname] = @pool.delete(qname) if @pool[qname] && @pool[qname].size > 0 end sweeped end sweep_pool.keys.each do |k| ret[k] = sweep_pool[k].reduce(&:+) || [] end ret end |