Class: Streamdal::Metrics
- Inherits:
-
Object
- Object
- Streamdal::Metrics
- Defined in:
- lib/metrics.rb
Defined Under Namespace
Classes: CounterEntry
Constant Summary collapse
- COUNTER_CONSUME_BYTES =
'counter_consume_bytes'- COUNTER_CONSUME_PROCESSED =
'counter_consume_processed'- COUNTER_CONSUME_ERRORS =
'counter_consume_errors'- COUNTER_PRODUCE_BYTES =
'counter_produce_bytes'- COUNTER_PRODUCE_PROCESSED =
'counter_produce_processed'- COUNTER_PRODUCE_ERRORS =
'counter_produce_errors'- COUNTER_NOTIFY =
'counter_notify'- COUNTER_DROPPED_TAIL_MESSAGES =
'counter_dropped_tail_messages'- COUNTER_CONSUME_BYTES_RATE =
'counter_consume_bytes_rate'- COUNTER_PRODUCE_BYTES_RATE =
'counter_produce_bytes_rate'- COUNTER_CONSUME_PROCESSED_RATE =
'counter_consume_processed_rate'- COUNTER_PRODUCE_PROCESSED_RATE =
'counter_produce_processed_rate'- WORKER_POOL_SIZE =
3- DEFAULT_COUNTER_REAPER_INTERVAL =
10- DEFAULT_COUNTER_TTL =
10- DEFAULT_COUNTER_PUBLISH_INTERVAL =
1
Class Method Summary collapse
Instance Method Summary collapse
- #get_counter(ce) ⇒ Object
- #incr(ce) ⇒ Object
-
#initialize(cfg) ⇒ Metrics
constructor
A new instance of Metrics.
- #new_counter(ce) ⇒ Object
- #remove_counter(name) ⇒ Object
- #shutdown ⇒ Object
Constructor Details
#initialize(cfg) ⇒ Metrics
Returns a new instance of Metrics.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/metrics.rb', line 58 def initialize(cfg) raise ArgumentError, 'cfg is nil' if cfg.nil? @cfg = cfg @log = cfg[:log] @counters = {} @counters_mtx = Mutex.new @exit = false @incr_queue = Queue.new @publish_queue = Queue.new @workers = [] @stub = Streamdal::Protos::Internal::Stub.new(@cfg[:streamdal_url], :this_channel_is_insecure) _start end |
Class Method Details
.composite_id(counter_name, labels = {}) ⇒ Object
87 88 89 90 |
# File 'lib/metrics.rb', line 87 def self.composite_id(counter_name, labels = {}) labels = {} if labels.nil? "#{counter_name}-#{labels.values.join('-')}" end |
Instance Method Details
#get_counter(ce) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/metrics.rb', line 92 def get_counter(ce) raise ArgumentError, 'ce is nil' if ce.nil? k = Metrics.composite_id(ce.name, ce.labels) @counters_mtx.synchronize do @counters[k] if @counters.key?(k) end # No counter exists, create a new one and return it new_counter(ce) end |
#incr(ce) ⇒ Object
115 116 117 118 119 120 121 122 123 124 |
# File 'lib/metrics.rb', line 115 def incr(ce) c = get_counter(ce) if c.nil? new_counter(ce) nil end @incr_queue.push(ce) end |
#new_counter(ce) ⇒ Object
105 106 107 108 109 110 111 112 113 |
# File 'lib/metrics.rb', line 105 def new_counter(ce) c = Counter.new(ce.name, ce.aud, ce.labels, ce.value) @counters_mtx.synchronize do @counters[Metrics.composite_id(ce.name, ce.labels)] = c end c end |
#remove_counter(name) ⇒ Object
126 127 128 129 130 |
# File 'lib/metrics.rb', line 126 def remove_counter(name) @counters_mtx.synchronize do @counters.delete(name) end end |
#shutdown ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/metrics.rb', line 74 def shutdown # Set exit flag so workers exit @exit = true # Let loops exit sleep(1) # Exit any remaining threads @workers.each do |w| w.exit if w.running? end end |