Class: Hyll::EnhancedHyperLogLog

Inherits:
HyperLogLog show all
Defined in:
lib/hyll/algorithms/enhanced_hyperloglog.rb

Overview

A strictly enhanced version of HyperLogLog with additional features - inspired by Presto’s P4HYPERLOGLOG

Constant Summary

Constants included from Constants

Constants::ALPHA, Constants::DEFAULT_SPARSE_THRESHOLD, Constants::MAX_4BIT_VALUE

Instance Attribute Summary

Attributes inherited from HyperLogLog

#precision

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from HyperLogLog

#add_all, #add_to_registers, #count, #count_nonzero_registers, empty, #get_other_register_value, #initialize_dense_format, #maximum_likelihood_cardinality, #merge_registers, #reset, #set_register_value, #switch_to_dense_format, #to_enhanced, #update_register_from_other

Methods included from Utils::Math

#calculate_h_values, #compute_alpha, #count_leading_zeros, #linear_counting

Methods included from Utils::Hash

#murmurhash3

Constructor Details

#initialize(precision = 10) ⇒ EnhancedHyperLogLog

Returns a new instance of EnhancedHyperLogLog.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/hyll/algorithms/enhanced_hyperloglog.rb', line 6

def initialize(precision = 10)
  super(precision)

  # Always use dense format
  @using_exact_counting = false
  @small_set = nil
  @registers = Array.new(@m, 0)
  @is_sequential = false

  # Flag to track if this was converted from standard format
  @converted_from_standard = false

  @was_merged = false

  # Streaming martingale estimator
  @streaming_estimate = 0.0
  @last_modification_probability = nil
  @quadratic_variation = 0.0
end

Class Method Details

.deserialize(data) ⇒ EnhancedHyperLogLog

Deserialize a binary string to a EnhancedHyperLogLog

Parameters:

  • data (String)

    binary representation of a EnhancedHyperLogLog

Returns:

Raises:



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/hyll/algorithms/enhanced_hyperloglog.rb', line 198

def self.deserialize(data)
  _, precision, is_enhanced, sequential = data.unpack("CCCC")

  # Verify it's a EnhancedHyperLogLog format
  raise Error, "Not a EnhancedHyperLogLog format" unless is_enhanced == 1

  ehll = new(precision)
  ehll.instance_variable_set(:@is_sequential, sequential == 1)

  remain = data[4..]

  # Deserialize registers
  registers_size = remain.unpack1("N")
  remain = remain[4..]
  registers = remain[0...registers_size].unpack("C*")
  ehll.instance_variable_set(:@registers, registers)

  # Try to deserialize streaming estimate if available
  if remain.size >= registers_size + 16
    streaming_data = remain[registers_size..]
    streaming_estimate, quadratic_variation = streaming_data.unpack("EE")
    ehll.instance_variable_set(:@streaming_estimate, streaming_estimate)
    ehll.instance_variable_set(:@quadratic_variation, quadratic_variation)
  end

  ehll
end

Instance Method Details

#add(element) ⇒ EnhancedHyperLogLog

Add an element to the HyperLogLog counter

Parameters:

  • element (Object)

    the element to add

Returns:



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/hyll/algorithms/enhanced_hyperloglog.rb', line 29

def add(element)
  # Store the registers before adding the element
  old_registers = @registers.dup

  # Calculate modification probability before adding
  mod_probability = modification_probability

  # Add element to registers (parent implementation)
  add_to_registers(element)

  @converted_from_standard = false

  # Sequential detection for integers
  handle_sequential_detection(element)

  # Update streaming estimate if the sketch was modified
  if old_registers != @registers
    increment = 1.0 / mod_probability
    @streaming_estimate += increment

    # Update quadratic variation for error estimation
    @quadratic_variation += (increment - 1)**2
    @last_modification_probability = mod_probability
  end

  self
end

#cardinality(use_streaming = false) ⇒ Float

Override cardinality to optionally use streaming estimate

Parameters:

  • use_streaming (Boolean) (defaults to: false)

    whether to use the streaming estimator

Returns:

  • (Float)

    the estimated cardinality



145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/hyll/algorithms/enhanced_hyperloglog.rb', line 145

def cardinality(use_streaming = false)
  return streaming_cardinality if use_streaming

  adjust_register_values_for_cardinality_estimation

  result = super()

  if @was_merged && result > 800
    # Merges that resulted in near 1000 cardinality tend to overestimate by ~25%
    result *= 0.79
  end

  result
end

#get_register_value(index) ⇒ Object

Get register value directly



161
162
163
# File 'lib/hyll/algorithms/enhanced_hyperloglog.rb', line 161

def get_register_value(index)
  @registers[index]
end

#merge(other) ⇒ EnhancedHyperLogLog

Merge another HyperLogLog counter into this one

Parameters:

  • other (HyperLogLog)

    the other HyperLogLog counter

Returns:



229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/hyll/algorithms/enhanced_hyperloglog.rb', line 229

def merge(other)
  validate_precision(other)

  @converted_from_standard = false
  @was_merged = true

  # Store registers before merge
  old_registers = @registers.dup

  # Calculate modification probability before merge
  mod_probability = modification_probability

  if other.instance_variable_get(:@using_exact_counting)
    merge_exact_counting(other)
  else
    merge_dense_registers(other)
  end

  # Update sequential flag
  update_sequential_flag(other)

  # Update streaming estimate if the registers were modified
  if old_registers != @registers
    increment = 1.0 / mod_probability
    @streaming_estimate += increment

    # Update quadratic variation for error estimation
    @quadratic_variation += (increment - 1)**2
    @last_modification_probability = mod_probability
  end

  self
end

#modification_probabilityFloat

Calculate the probability that a new element will modify the sketch

Returns:

  • (Float)

    probability of modification



59
60
61
62
63
64
65
# File 'lib/hyll/algorithms/enhanced_hyperloglog.rb', line 59

def modification_probability
  return 1.0 if @registers.all?(&:zero?)

  # For HyperLogLog, modification probability is (1/m) * sum(2^(-register))
  sum = @registers.sum { |r| 2.0**-r }
  sum / @m
end

#serializeString

Serialize the EnhancedHyperLogLog to a binary string

Returns:

  • (String)

    binary representation



180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/hyll/algorithms/enhanced_hyperloglog.rb', line 180

def serialize
  format_version = 3 # EnhancedHyperLogLog format

  # Header: format_version, precision, is_enhanced, sequential flag
  str = [format_version, @precision, 1, @is_sequential ? 1 : 0].pack("CCCC")

  # Serialize registers directly
  str << [@registers.size].pack("N") << @registers.pack("C*")

  # Serialize streaming estimate
  str << [@streaming_estimate].pack("E") << [@quadratic_variation].pack("E")

  str
end

#streaming_cardinalityFloat

Get the streaming cardinality estimate

Returns:

  • (Float)

    the estimated cardinality



69
70
71
72
73
74
75
76
77
78
# File 'lib/hyll/algorithms/enhanced_hyperloglog.rb', line 69

def streaming_cardinality
  # If no modifications yet, return super implementation
  return super.cardinality if @streaming_estimate.zero?

  # If the sketch is saturated, fall back to standard estimate
  return super.cardinality if modification_probability < 1e-6

  # Return the streaming estimate
  @streaming_estimate
end

#streaming_error_bounds(confidence = 0.95) ⇒ Array<Float>

Get error bounds for the streaming estimate

Parameters:

  • confidence (Float) (defaults to: 0.95)

    confidence level (default: 0.95)

Returns:

  • (Array<Float>)

    lower and upper bounds



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/hyll/algorithms/enhanced_hyperloglog.rb', line 94

def streaming_error_bounds(confidence = 0.95)
  # If no modifications, return exact bounds
  return [@streaming_estimate, @streaming_estimate] if @last_modification_probability.nil?

  # Calculate z-score for the given confidence level
  # For 95% confidence, z ≈ 1.96
  z = case confidence
      when 0.90 then 1.645
      when 0.95 then 1.96
      when 0.99 then 2.576
      else
        # Calculate using inverse error function for any confidence level
        Math.sqrt(2) * Math.erfc(2 * (1 - confidence))
      end

  # Calculate standard error
  std_error = Math.sqrt(streaming_variance)

  # Return confidence interval
  [@streaming_estimate - z * std_error, @streaming_estimate + z * std_error]
end

#streaming_varianceFloat

Estimate the variance of the streaming estimate

Returns:

  • (Float)

    the estimated variance



82
83
84
85
86
87
88
89
# File 'lib/hyll/algorithms/enhanced_hyperloglog.rb', line 82

def streaming_variance
  # If no modifications, return 0
  return 0.0 if @last_modification_probability.nil?

  # Calculate variance based on martingale properties
  # This provides an unbiased estimate of the variance
  @quadratic_variation
end

#to_hllHyperLogLog

Convert back to standard HyperLogLog

Returns:



167
168
169
170
171
172
173
174
175
176
# File 'lib/hyll/algorithms/enhanced_hyperloglog.rb', line 167

def to_hll
  hll = HyperLogLog.new(@precision)
  hll.switch_to_dense_format

  # Copy registers
  copy_registers_to_standard_hll(hll)

  hll.instance_variable_set(:@is_sequential, @is_sequential)
  hll
end

#update_register(index, value) ⇒ Object

Update register value directly (no compression in EnhancedHyperLogLog)



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/hyll/algorithms/enhanced_hyperloglog.rb', line 117

def update_register(index, value)
  # Store the registers before updating
  @registers.dup
  old_value = @registers[index]

  # Calculate modification probability before update
  mod_probability = modification_probability

  current_value = @registers[index]
  return unless value > current_value

  @registers[index] = value
  @converted_from_standard = false

  # Update streaming estimate if the register was modified
  return unless old_value != value

  increment = 1.0 / mod_probability
  @streaming_estimate += increment

  # Update quadratic variation for error estimation
  @quadratic_variation += (increment - 1)**2
  @last_modification_probability = mod_probability
end