Class: PulsarSdk::Consumer::MessageTracker

Inherits:
Object
  • Object
show all
Defined in:
lib/pulsar_sdk/consumer/message_tracker.rb

Defined Under Namespace

Classes: NackQueue, ReceivedQueue

Instance Method Summary collapse

Constructor Details

#initialize(redelivery_delay) ⇒ MessageTracker

Returns a new instance of MessageTracker.



8
9
10
11
12
13
14
15
# File 'lib/pulsar_sdk/consumer/message_tracker.rb', line 8

def initialize(redelivery_delay)
  @redelivery_delay = redelivery_delay
  @received_message = ReceivedQueue.new
  @acknowledge_message = NackQueue.new {|parent, child| child[:ack_at] <=> parent[:ack_at] }
  @consumers = {}

  @tracker = track
end

Instance Method Details

#add_consumer(consumer) ⇒ Object



17
18
19
# File 'lib/pulsar_sdk/consumer/message_tracker.rb', line 17

def add_consumer(consumer)
  @consumers[consumer.consumer_id] = consumer
end

#closeObject



47
48
49
# File 'lib/pulsar_sdk/consumer/message_tracker.rb', line 47

def close
  @received_message.close
end

#receive(*args) ⇒ Object



21
22
23
# File 'lib/pulsar_sdk/consumer/message_tracker.rb', line 21

def receive(*args)
  @received_message.add(*args)
end

#shift(timeout) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/pulsar_sdk/consumer/message_tracker.rb', line 25

def shift(timeout)
  cmd, meta_and_payload = @received_message.pop(timeout)

  return if cmd.nil?

  message = PulsarSdk::Protocol::Structure.new(meta_and_payload).decode

  consumer_id = cmd.message&.consumer_id
  real_consumer = @consumers[consumer_id]

  message.assign_attributes(
    message_id: cmd.message&.message_id,
    consumer_id: consumer_id,
    topic: real_consumer&.topic,
    ack_handler: ack_handler
  )

  real_consumer&.increase_fetched

  [cmd, message]
end