Class: PulsarSdk::Consumer::Base
- Inherits:
-
Object
- Object
- PulsarSdk::Consumer::Base
- Defined in:
- lib/pulsar_sdk/consumer/base.rb
Defined Under Namespace
Classes: SeqGenerator
Instance Attribute Summary collapse
-
#consumer_id ⇒ Object
readonly
Returns the value of attribute consumer_id.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Instance Method Summary collapse
- #close ⇒ Object
- #disconnect? ⇒ Boolean
- #execute(cmd) ⇒ Object
- #execute_async(cmd) ⇒ Object
- #flow ⇒ Object
- #flow_if_need ⇒ Object
- #grab_cnx ⇒ Object
- #increase_fetched(n = 1) ⇒ Object
-
#initialize(client, message_tracker, opts) ⇒ Base
constructor
A new instance of Base.
- #subscription ⇒ Object
- #unsubscribe ⇒ Object
Constructor Details
#initialize(client, message_tracker, opts) ⇒ Base
Returns a new instance of Base.
6 7 8 9 10 11 |
# File 'lib/pulsar_sdk/consumer/base.rb', line 6 def initialize(client, , opts) @opts = opts @topic = @opts.topic @message_tracker = @client = client end |
Instance Attribute Details
#consumer_id ⇒ Object (readonly)
Returns the value of attribute consumer_id.
4 5 6 |
# File 'lib/pulsar_sdk/consumer/base.rb', line 4 def consumer_id @consumer_id end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
4 5 6 |
# File 'lib/pulsar_sdk/consumer/base.rb', line 4 def topic @topic end |
Instance Method Details
#close ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/pulsar_sdk/consumer/base.rb', line 65 def close base_cmd = Pulsar::Proto::BaseCommand.new( type: Pulsar::Proto::BaseCommand::Type::CLOSE_CONSUMER, close_consumer: Pulsar::Proto::CommandCloseConsumer.new( consumer_id: @consumer_id ) ) execute(base_cmd) unless disconnect? remove_handler! end |
#disconnect? ⇒ Boolean
77 78 79 |
# File 'lib/pulsar_sdk/consumer/base.rb', line 77 def disconnect? !@established end |
#execute(cmd) ⇒ Object
81 82 83 |
# File 'lib/pulsar_sdk/consumer/base.rb', line 81 def execute(cmd) write(cmd) end |
#execute_async(cmd) ⇒ Object
85 86 87 |
# File 'lib/pulsar_sdk/consumer/base.rb', line 85 def execute_async(cmd) write(cmd, nil, true) end |
#flow ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/pulsar_sdk/consumer/base.rb', line 35 def flow base_cmd = Pulsar::Proto::BaseCommand.new( type: Pulsar::Proto::BaseCommand::Type::FLOW, flow: Pulsar::Proto::CommandFlow.new( messagePermits: @prefetch ) ) execute(base_cmd) @capacity += @prefetch end |
#flow_if_need ⇒ Object
60 61 62 63 |
# File 'lib/pulsar_sdk/consumer/base.rb', line 60 def flow_if_need return if @capacity > 0 && [@prefetch / 2, 1].max.ceil < (@capacity - @fetched) flow end |
#grab_cnx ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/pulsar_sdk/consumer/base.rb', line 13 def grab_cnx @prefetch = @opts.prefetch @fetched = 0 @capacity = 0 @conn = @client.connection(*@client.lookup(@topic)) @established = true @seq_generator = SeqGenerator.new(@conn.seq_generator) @consumer_id = @seq_generator.new_consumer_id @consumer_name = @opts.name @subscription_name = @opts.subscription_name result = init_consumer @consumer_name = result.consumerName unless result.nil? end |
#increase_fetched(n = 1) ⇒ Object
31 32 33 |
# File 'lib/pulsar_sdk/consumer/base.rb', line 31 def increase_fetched(n = 1) @fetched += n end |
#subscription ⇒ Object
48 49 50 |
# File 'lib/pulsar_sdk/consumer/base.rb', line 48 def subscription @subscription_name end |
#unsubscribe ⇒ Object
52 53 54 55 56 57 58 |
# File 'lib/pulsar_sdk/consumer/base.rb', line 52 def unsubscribe base_cmd = Pulsar::Proto::BaseCommand.new( type: Pulsar::Proto::BaseCommand::Type::UNSUBSCRIBE, unsubscribe: Pulsar::Proto::CommandUnsubscribe.new ) execute_async(base_cmd) end |