Class: PulsarSdk::Consumer::MessageTracker
- Inherits:
-
Object
- Object
- PulsarSdk::Consumer::MessageTracker
show all
- Defined in:
- lib/pulsar_sdk/consumer/message_tracker.rb
Defined Under Namespace
Classes: NackQueue, ReceivedQueue
Instance Method Summary
collapse
Constructor Details
#initialize(redelivery_delay) ⇒ MessageTracker
Returns a new instance of MessageTracker.
8
9
10
11
12
13
14
15
|
# File 'lib/pulsar_sdk/consumer/message_tracker.rb', line 8
def initialize(redelivery_delay)
@redelivery_delay = redelivery_delay
@received_message = ReceivedQueue.new
@acknowledge_message = NackQueue.new {|parent, child| child[:ack_at] <=> parent[:ack_at] }
@consumers = {}
@tracker = track
end
|
Instance Method Details
#add_consumer(consumer) ⇒ Object
17
18
19
|
# File 'lib/pulsar_sdk/consumer/message_tracker.rb', line 17
def add_consumer(consumer)
@consumers[consumer.consumer_id] = consumer
end
|
#close ⇒ Object
47
48
49
|
# File 'lib/pulsar_sdk/consumer/message_tracker.rb', line 47
def close
@received_message.close
end
|
#receive(*args) ⇒ Object
21
22
23
|
# File 'lib/pulsar_sdk/consumer/message_tracker.rb', line 21
def receive(*args)
@received_message.add(*args)
end
|
#shift(timeout) ⇒ Object
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
# File 'lib/pulsar_sdk/consumer/message_tracker.rb', line 25
def shift(timeout)
cmd, meta_and_payload = @received_message.pop(timeout)
return if cmd.nil?
message = PulsarSdk::Protocol::Structure.new(meta_and_payload).decode
consumer_id = cmd.message&.consumer_id
real_consumer = @consumers[consumer_id]
message.assign_attributes(
message_id: cmd.message&.message_id,
consumer_id: consumer_id,
topic: real_consumer&.topic,
ack_handler: ack_handler
)
real_consumer&.increase_fetched
[cmd, message]
end
|