Class: PulsarSdk::Producer::Base
- Inherits:
-
Object
- Object
- PulsarSdk::Producer::Base
- Includes:
- Tweaks::CleanInspect
- Defined in:
- lib/pulsar_sdk/producer/base.rb
Instance Method Summary collapse
- #close ⇒ Object
- #disconnect? ⇒ Boolean
- #execute(cmd, msg = nil, timeout = nil) ⇒ Object
- #execute_async(cmd, msg = nil) ⇒ Object
- #grab_cnx ⇒ Object
-
#initialize(client, opts) ⇒ Base
constructor
A new instance of Base.
-
#receipt ⇒ Object
获取发送回执 TODO get receipt by sequence_id.
Methods included from Tweaks::CleanInspect
Constructor Details
#initialize(client, opts) ⇒ Base
Returns a new instance of Base.
6 7 8 9 |
# File 'lib/pulsar_sdk/producer/base.rb', line 6 def initialize(client, opts) @opts = opts @client = client end |
Instance Method Details
#close ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/pulsar_sdk/producer/base.rb', line 49 def close return if @stoped base_cmd = Pulsar::Proto::BaseCommand.new( type: Pulsar::Proto::BaseCommand::Type::CLOSE_PRODUCER, close_producer: Pulsar::Proto::CommandCloseProducer.new ) execute(base_cmd) unless disconnect? unbind_handler! @stoped = true @receipt_queue.close end |
#disconnect? ⇒ Boolean
65 66 67 |
# File 'lib/pulsar_sdk/producer/base.rb', line 65 def disconnect? !@established end |
#execute(cmd, msg = nil, timeout = nil) ⇒ Object
28 29 30 |
# File 'lib/pulsar_sdk/producer/base.rb', line 28 def execute(cmd, msg = nil, timeout = nil) write(cmd, msg, false, timeout) end |
#execute_async(cmd, msg = nil) ⇒ Object
32 33 34 |
# File 'lib/pulsar_sdk/producer/base.rb', line 32 def execute_async(cmd, msg = nil) write(cmd, msg, true) end |
#grab_cnx ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/pulsar_sdk/producer/base.rb', line 11 def grab_cnx topic = @opts.topic @conn = @client.connection(*@client.lookup(topic)) @established = true @seq_generator = SeqGenerator.new(@conn.seq_generator) @producer_id = @seq_generator.new_producer_id @producer_name = [@opts.name, @producer_id].join('.') @receipt_queue = ReceiptQueue.new @stoped = false @producer_name = init_producer(topic) end |
#receipt ⇒ Object
获取发送回执TODO get receipt by sequence_id
38 39 40 41 42 43 44 45 46 47 |
# File 'lib/pulsar_sdk/producer/base.rb', line 38 def receipt receipt_ = @receipt_queue.pop.first return if receipt_.nil? if block_given? yield receipt_ end receipt_ end |