Class: PulsarSdk::Consumer::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/pulsar_sdk/consumer/base.rb

Defined Under Namespace

Classes: SeqGenerator

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, message_tracker, opts) ⇒ Base

Returns a new instance of Base.



6
7
8
9
10
11
# File 'lib/pulsar_sdk/consumer/base.rb', line 6

def initialize(client, message_tracker, opts)
  @opts = opts
  @topic = @opts.topic
  @message_tracker = message_tracker
  @client = client
end

Instance Attribute Details

#consumer_idObject (readonly)

Returns the value of attribute consumer_id.



4
5
6
# File 'lib/pulsar_sdk/consumer/base.rb', line 4

def consumer_id
  @consumer_id
end

#topicObject (readonly)

Returns the value of attribute topic.



4
5
6
# File 'lib/pulsar_sdk/consumer/base.rb', line 4

def topic
  @topic
end

Instance Method Details

#closeObject



65
66
67
68
69
70
71
72
73
74
75
# File 'lib/pulsar_sdk/consumer/base.rb', line 65

def close
  base_cmd = Pulsar::Proto::BaseCommand.new(
    type: Pulsar::Proto::BaseCommand::Type::CLOSE_CONSUMER,
    close_consumer: Pulsar::Proto::CommandCloseConsumer.new(
      consumer_id: @consumer_id
    )
  )
  execute(base_cmd) unless disconnect?

  remove_handler!
end

#disconnect?Boolean

Returns:

  • (Boolean)


77
78
79
# File 'lib/pulsar_sdk/consumer/base.rb', line 77

def disconnect?
  !@established
end

#execute(cmd) ⇒ Object



81
82
83
# File 'lib/pulsar_sdk/consumer/base.rb', line 81

def execute(cmd)
  write(cmd)
end

#execute_async(cmd) ⇒ Object



85
86
87
# File 'lib/pulsar_sdk/consumer/base.rb', line 85

def execute_async(cmd)
  write(cmd, nil, true)
end

#flowObject



35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/pulsar_sdk/consumer/base.rb', line 35

def flow
  base_cmd = Pulsar::Proto::BaseCommand.new(
    type: Pulsar::Proto::BaseCommand::Type::FLOW,
    flow: Pulsar::Proto::CommandFlow.new(
      messagePermits: @prefetch
    )
  )

  execute(base_cmd)

  @capacity += @prefetch
end

#flow_if_needObject



60
61
62
63
# File 'lib/pulsar_sdk/consumer/base.rb', line 60

def flow_if_need
  return if @capacity > 0 && [@prefetch / 2, 1].max.ceil < (@capacity - @fetched)
  flow
end

#grab_cnxObject



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/pulsar_sdk/consumer/base.rb', line 13

def grab_cnx
  @prefetch = @opts.prefetch
  @fetched = 0
  @capacity = 0

  @conn = @client.connection(*@client.lookup(@topic))
  @established = true

  @seq_generator = SeqGenerator.new(@conn.seq_generator)

  @consumer_id = @seq_generator.new_consumer_id
  @consumer_name = @opts.name
  @subscription_name = @opts.subscription_name

  result = init_consumer
  @consumer_name = result.consumerName unless result.nil?
end

#increase_fetched(n = 1) ⇒ Object



31
32
33
# File 'lib/pulsar_sdk/consumer/base.rb', line 31

def increase_fetched(n = 1)
  @fetched += n
end

#subscriptionObject



48
49
50
# File 'lib/pulsar_sdk/consumer/base.rb', line 48

def subscription
  @subscription_name
end

#unsubscribeObject



52
53
54
55
56
57
58
# File 'lib/pulsar_sdk/consumer/base.rb', line 52

def unsubscribe
  base_cmd = Pulsar::Proto::BaseCommand.new(
    type: Pulsar::Proto::BaseCommand::Type::UNSUBSCRIBE,
    unsubscribe: Pulsar::Proto::CommandUnsubscribe.new
  )
  execute_async(base_cmd)
end