Class: Streamdal::Metrics

Inherits:
Object
  • Object
show all
Defined in:
lib/metrics.rb

Defined Under Namespace

Classes: CounterEntry

Constant Summary collapse

COUNTER_CONSUME_BYTES =
'counter_consume_bytes'
COUNTER_CONSUME_PROCESSED =
'counter_consume_processed'
COUNTER_CONSUME_ERRORS =
'counter_consume_errors'
COUNTER_PRODUCE_BYTES =
'counter_produce_bytes'
COUNTER_PRODUCE_PROCESSED =
'counter_produce_processed'
COUNTER_PRODUCE_ERRORS =
'counter_produce_errors'
COUNTER_NOTIFY =
'counter_notify'
COUNTER_DROPPED_TAIL_MESSAGES =
'counter_dropped_tail_messages'
COUNTER_CONSUME_BYTES_RATE =
'counter_consume_bytes_rate'
COUNTER_PRODUCE_BYTES_RATE =
'counter_produce_bytes_rate'
COUNTER_CONSUME_PROCESSED_RATE =
'counter_consume_processed_rate'
COUNTER_PRODUCE_PROCESSED_RATE =
'counter_produce_processed_rate'
WORKER_POOL_SIZE =
3
DEFAULT_COUNTER_REAPER_INTERVAL =
10
DEFAULT_COUNTER_TTL =
10
DEFAULT_COUNTER_PUBLISH_INTERVAL =
1

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cfg) ⇒ Metrics

Returns a new instance of Metrics.

Raises:

  • (ArgumentError)


58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/metrics.rb', line 58

def initialize(cfg)
  raise ArgumentError, 'cfg is nil' if cfg.nil?

  @cfg = cfg
  @log = cfg[:log]
  @counters = {}
  @counters_mtx = Mutex.new
  @exit = false
  @incr_queue = Queue.new
  @publish_queue = Queue.new
  @workers = []
  @stub = Streamdal::Protos::Internal::Stub.new(@cfg[:streamdal_url], :this_channel_is_insecure)

  _start
end

Class Method Details

.composite_id(counter_name, labels = {}) ⇒ Object



87
88
89
90
# File 'lib/metrics.rb', line 87

def self.composite_id(counter_name, labels = {})
  labels = {} if labels.nil?
  "#{counter_name}-#{labels.values.join('-')}"
end

Instance Method Details

#get_counter(ce) ⇒ Object

Raises:

  • (ArgumentError)


92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/metrics.rb', line 92

def get_counter(ce)
  raise ArgumentError, 'ce is nil' if ce.nil?

  k = Metrics.composite_id(ce.name, ce.labels)

  @counters_mtx.synchronize do
    @counters[k] if @counters.key?(k)
  end

  # No counter exists, create a new one and return it
  new_counter(ce)
end

#incr(ce) ⇒ Object



115
116
117
118
119
120
121
122
123
124
# File 'lib/metrics.rb', line 115

def incr(ce)
  c = get_counter(ce)

  if c.nil?
    new_counter(ce)
    nil
  end

  @incr_queue.push(ce)
end

#new_counter(ce) ⇒ Object



105
106
107
108
109
110
111
112
113
# File 'lib/metrics.rb', line 105

def new_counter(ce)
  c = Counter.new(ce.name, ce.aud, ce.labels, ce.value)

  @counters_mtx.synchronize do
    @counters[Metrics.composite_id(ce.name, ce.labels)] = c
  end

  c
end

#remove_counter(name) ⇒ Object



126
127
128
129
130
# File 'lib/metrics.rb', line 126

def remove_counter(name)
  @counters_mtx.synchronize do
    @counters.delete(name)
  end
end

#shutdownObject



74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/metrics.rb', line 74

def shutdown
  # Set exit flag so workers exit
  @exit = true

  # Let loops exit
  sleep(1)

  # Exit any remaining threads
  @workers.each do |w|
    w.exit if w.running?
  end
end