Class: PulsarSdk::Consumer::Manager

Inherits:
Object
  • Object
show all
Includes:
Tweaks::CleanInspect
Defined in:
lib/pulsar_sdk/consumer/manager.rb

Instance Method Summary collapse

Methods included from Tweaks::CleanInspect

#inspect

Constructor Details

#initialize(client, opts) ⇒ Manager

Returns a new instance of Manager.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/pulsar_sdk/consumer/manager.rb', line 6

def initialize(client, opts)
  raise "client expected a PulsarSdk::Client::Rpc got #{client.class}" unless client.is_a?(PulsarSdk::Client::Rpc)
  raise "opts expected a PulsarSdk::Options::Consumer got #{opts.class}" unless opts.is_a?(PulsarSdk::Options::Consumer)

  @topic = opts.topic

  @listen_wait = opts.listen_wait

  @message_tracker = ::PulsarSdk::Consumer::MessageTracker.new(opts.redelivery_delay)

  @consumers = init_consumer_by(client, opts)

  @stoped = false
end

Instance Method Details

#closeObject



67
68
69
70
71
72
73
74
# File 'lib/pulsar_sdk/consumer/manager.rb', line 67

def close
  PulsarSdk.logger.debug(__method__){"current @stoped #{@stoped} close now!"}
  return if @stoped
  @consumers.each(&:close)
  @stoped = true

  @message_tracker.close
end

#flowObject

NOTE some topic maybe have large permits if there is no message



22
23
24
25
# File 'lib/pulsar_sdk/consumer/manager.rb', line 22

def flow
  ensure_connection
  @consumers.each(&:flow_if_need)
end

#listen(autoack = false) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/pulsar_sdk/consumer/manager.rb', line 44

def listen(autoack = false)
  raise 'listen require passing a block!!' if !block_given?
  ensure_connection

  loop do
    return if @stoped

    flow

    cmd, msg = receive(@listen_wait)
    return if msg.nil?

    result = yield cmd, msg

    if autoack && result == false
      msg.nack
      next
    end

    msg.ack if autoack
  end
end

#receive(timeout = nil) ⇒ Object

if timeout is nil wait until get message



39
40
41
42
# File 'lib/pulsar_sdk/consumer/manager.rb', line 39

def receive(timeout = nil)
  ensure_connection
  @message_tracker.shift(timeout)
end

#subscriptionObject

NOTE all consumers has same name



28
29
30
31
# File 'lib/pulsar_sdk/consumer/manager.rb', line 28

def subscription
  ensure_connection
  @consumers.find(&:subscription)
end

#unsubscribeObject



33
34
35
36
# File 'lib/pulsar_sdk/consumer/manager.rb', line 33

def unsubscribe
  ensure_connection
  @consumers.each(&:unsubscribe)
end