Class: Streamdal::Metrics
- Inherits:
-
Object
- Object
- Streamdal::Metrics
- Defined in:
- lib/metrics.rb
Defined Under Namespace
Classes: CounterEntry
Constant Summary collapse
- COUNTER_CONSUME_BYTES =
"counter_consume_bytes"- COUNTER_CONSUME_PROCESSED =
"counter_consume_processed"- COUNTER_CONSUME_ERRORS =
"counter_consume_errors"- COUNTER_PRODUCE_BYTES =
"counter_produce_bytes"- COUNTER_PRODUCE_PROCESSED =
"counter_produce_processed"- COUNTER_PRODUCE_ERRORS =
"counter_produce_errors"- COUNTER_NOTIFY =
"counter_notify"- COUNTER_DROPPED_TAIL_MESSAGES =
"counter_dropped_tail_messages"- COUNTER_CONSUME_BYTES_RATE =
"counter_consume_bytes_rate"- COUNTER_PRODUCE_BYTES_RATE =
"counter_produce_bytes_rate"- COUNTER_CONSUME_PROCESSED_RATE =
"counter_consume_processed_rate"- COUNTER_PRODUCE_PROCESSED_RATE =
"counter_produce_processed_rate"- WORKER_POOL_SIZE =
3- DEFAULT_COUNTER_REAPER_INTERVAL =
10- DEFAULT_COUNTER_TTL =
10- DEFAULT_COUNTER_PUBLISH_INTERVAL =
1
Class Method Summary collapse
Instance Method Summary collapse
- #get_counter(ce) ⇒ Object
- #incr(ce) ⇒ Object
-
#initialize(cfg) ⇒ Metrics
constructor
A new instance of Metrics.
- #new_counter(ce) ⇒ Object
- #remove_counter(name) ⇒ Object
- #shutdown ⇒ Object
Constructor Details
#initialize(cfg) ⇒ Metrics
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/metrics.rb', line 56 def initialize(cfg) if cfg.nil? raise ArgumentError, "cfg is nil" end @cfg = cfg @log = cfg[:log] @counters = {} @counters_mtx = Mutex.new @exit = false @incr_queue = Queue.new @publish_queue = Queue.new @workers = [] @stub = Streamdal::Protos::Internal::Stub.new(@cfg[:streamdal_url], :this_channel_is_insecure) _start end |
Class Method Details
.composite_id(counter_name, labels = {}) ⇒ Object
89 90 91 92 93 94 |
# File 'lib/metrics.rb', line 89 def self.composite_id(counter_name, labels = {}) if labels.nil? labels = {} end "#{counter_name}-#{labels.values.join("-")}".freeze end |
Instance Method Details
#get_counter(ce) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/metrics.rb', line 96 def get_counter(ce) if ce.nil? raise ArgumentError, "ce is nil" end k = Metrics::composite_id(ce.name, ce.labels) @counters_mtx.synchronize do if @counters.key?(k) @counters[k] end end # No counter exists, create a new one and return it new_counter(ce) end |
#incr(ce) ⇒ Object
123 124 125 126 127 128 129 130 131 132 |
# File 'lib/metrics.rb', line 123 def incr(ce) c = get_counter(ce) if c.nil? new_counter(ce) nil end @incr_queue.push(ce) end |
#new_counter(ce) ⇒ Object
113 114 115 116 117 118 119 120 121 |
# File 'lib/metrics.rb', line 113 def new_counter(ce) c = Counter.new(ce.name, ce.aud, ce.labels, ce.value) @counters_mtx.synchronize do @counters[Metrics::composite_id(ce.name, ce.labels)] = c end c end |
#remove_counter(name) ⇒ Object
134 135 136 137 138 |
# File 'lib/metrics.rb', line 134 def remove_counter(name) @counters_mtx.synchronize do @counters.delete(name) end end |
#shutdown ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/metrics.rb', line 74 def shutdown # Set exit flag so workers exit @exit = true # Let loops exit sleep(1) # Exit any remaining threads @workers.each do |w| if w.running? w.exit end end end |