Class: Streamdal::Tail
- Inherits:
-
Object
- Object
- Streamdal::Tail
- Defined in:
- lib/tail.rb
Instance Attribute Summary collapse
-
#active ⇒ Object
Returns the value of attribute active.
-
#queue ⇒ Object
Returns the value of attribute queue.
-
#request ⇒ Object
Returns the value of attribute request.
Instance Method Summary collapse
-
#initialize(request, streamdal_url, auth_token, log, metrics, active = false) ⇒ Tail
constructor
A new instance of Tail.
- #should_send ⇒ Object
- #start_tail_worker(worker_id) ⇒ Object
- #start_tail_workers ⇒ Object
- #stop_tail ⇒ Object
Constructor Details
#initialize(request, streamdal_url, auth_token, log, metrics, active = false) ⇒ Tail
Returns a new instance of Tail.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/tail.rb', line 10 def initialize(request, streamdal_url, auth_token, log, metrics, active = false) @request = request @streamdal_url = streamdal_url @auth_token = auth_token @logger = log @metrics = metrics @active = active @last_msg = Time.at(0) @queue = Queue.new @workers = [] # Only use rate limiting if sample_options is set return if request..nil? @limiter = BozosBuckets::Bucket.new( initial_token_count: request..sample_rate, refill_rate: request..sample_interval_seconds, max_token_count: request..sample_rate ) end |
Instance Attribute Details
#active ⇒ Object
Returns the value of attribute active.
8 9 10 |
# File 'lib/tail.rb', line 8 def active @active end |
#queue ⇒ Object
Returns the value of attribute queue.
8 9 10 |
# File 'lib/tail.rb', line 8 def queue @queue end |
#request ⇒ Object
Returns the value of attribute request.
8 9 10 |
# File 'lib/tail.rb', line 8 def request @request end |
Instance Method Details
#should_send ⇒ Object
85 86 87 88 89 |
# File 'lib/tail.rb', line 85 def should_send true if @limiter.nil? @limiter.use_tokens(1) end |
#start_tail_worker(worker_id) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/tail.rb', line 50 def start_tail_worker(worker_id) @logger.debug("Starting tail worker #{worker_id}") # Each worker gets it's own gRPC connection stub = Streamdal::Protos::Internal::Stub.new(@streamdal_url, :this_channel_is_insecure) while @active # If the queue is empty, sleep for a bit and loop again if @queue.empty? sleep(0.1) next end if Time.now - @last_msg < MIN_TAIL_RESPONSE_INTERVAL_MS sleep(MIN_TAIL_RESPONSE_INTERVAL_MS) @metrics.incr(Metrics::CounterEntry.new(COUNTER_DROPPED_TAIL_MESSAGES, nil, {}, 1)) @logger.debug("Dropped tail message for '#{@request.id}' due to rate limiting") next end next if stub.nil? tail_response = @queue.pop(false) @logger.debug("Sending tail request for '#{tail_response.tail_request_id}'") begin stub.send_tail([tail_response], metadata: { 'auth-token' => @auth_token }) rescue Error => e @logger.error("Error sending tail request: #{e}") end end @logger.debug "Tail worker #{worker_id} exited" end |
#start_tail_workers ⇒ Object
32 33 34 35 36 37 38 |
# File 'lib/tail.rb', line 32 def start_tail_workers NUM_TAIL_WORKERS.times do |worker_id| @workers << Thread.new { start_tail_worker(worker_id + 1) } end @active = true end |
#stop_tail ⇒ Object
40 41 42 43 44 45 46 47 48 |
# File 'lib/tail.rb', line 40 def stop_tail @active = false sleep(1) @workers.each do |worker| worker.exit if worker.alive? end end |