Class: Streamdal::Metrics

Inherits:
Object
  • Object
show all
Defined in:
lib/metrics.rb

Defined Under Namespace

Classes: CounterEntry

Constant Summary collapse

COUNTER_CONSUME_BYTES =
"counter_consume_bytes"
COUNTER_CONSUME_PROCESSED =
"counter_consume_processed"
COUNTER_CONSUME_ERRORS =
"counter_consume_errors"
COUNTER_PRODUCE_BYTES =
"counter_produce_bytes"
COUNTER_PRODUCE_PROCESSED =
"counter_produce_processed"
COUNTER_PRODUCE_ERRORS =
"counter_produce_errors"
COUNTER_NOTIFY =
"counter_notify"
COUNTER_DROPPED_TAIL_MESSAGES =
"counter_dropped_tail_messages"
COUNTER_CONSUME_BYTES_RATE =
"counter_consume_bytes_rate"
COUNTER_PRODUCE_BYTES_RATE =
"counter_produce_bytes_rate"
COUNTER_CONSUME_PROCESSED_RATE =
"counter_consume_processed_rate"
COUNTER_PRODUCE_PROCESSED_RATE =
"counter_produce_processed_rate"
WORKER_POOL_SIZE =
3
DEFAULT_COUNTER_REAPER_INTERVAL =
10
DEFAULT_COUNTER_TTL =
10
DEFAULT_COUNTER_PUBLISH_INTERVAL =
1

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cfg) ⇒ Metrics



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/metrics.rb', line 56

def initialize(cfg)
  if cfg.nil?
    raise ArgumentError, "cfg is nil"
  end

  @cfg = cfg
  @log = cfg[:log]
  @counters = {}
  @counters_mtx = Mutex.new
  @exit = false
  @incr_queue = Queue.new
  @publish_queue = Queue.new
  @workers = []
  @stub = Streamdal::Protos::Internal::Stub.new(@cfg[:streamdal_url], :this_channel_is_insecure)

  _start
end

Class Method Details

.composite_id(counter_name, labels = {}) ⇒ Object



89
90
91
92
93
94
# File 'lib/metrics.rb', line 89

def self.composite_id(counter_name, labels = {})
  if labels.nil?
    labels = {}
  end
  "#{counter_name}-#{labels.values.join("-")}".freeze
end

Instance Method Details

#get_counter(ce) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/metrics.rb', line 96

def get_counter(ce)
  if ce.nil?
    raise ArgumentError, "ce is nil"
  end

  k = Metrics::composite_id(ce.name, ce.labels)

  @counters_mtx.synchronize do
    if @counters.key?(k)
      @counters[k]
    end
  end

  # No counter exists, create a new one and return it
  new_counter(ce)
end

#incr(ce) ⇒ Object



123
124
125
126
127
128
129
130
131
132
# File 'lib/metrics.rb', line 123

def incr(ce)
  c = get_counter(ce)

  if c.nil?
    new_counter(ce)
    nil
  end

  @incr_queue.push(ce)
end

#new_counter(ce) ⇒ Object



113
114
115
116
117
118
119
120
121
# File 'lib/metrics.rb', line 113

def new_counter(ce)
  c = Counter.new(ce.name, ce.aud, ce.labels, ce.value)

  @counters_mtx.synchronize do
    @counters[Metrics::composite_id(ce.name, ce.labels)] = c
  end

  c
end

#remove_counter(name) ⇒ Object



134
135
136
137
138
# File 'lib/metrics.rb', line 134

def remove_counter(name)
  @counters_mtx.synchronize do
    @counters.delete(name)
  end
end

#shutdownObject



74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/metrics.rb', line 74

def shutdown
  # Set exit flag so workers exit
  @exit = true

  # Let loops exit
  sleep(1)

  # Exit any remaining threads
  @workers.each do |w|
    if w.running?
      w.exit
    end
  end
end