Class: PulsarSdk::Producer::Base

Inherits:
Object
  • Object
show all
Includes:
Tweaks::CleanInspect
Defined in:
lib/pulsar_sdk/producer/base.rb

Instance Method Summary collapse

Methods included from Tweaks::CleanInspect

#inspect

Constructor Details

#initialize(client, opts) ⇒ Base

Returns a new instance of Base.



6
7
8
9
# File 'lib/pulsar_sdk/producer/base.rb', line 6

def initialize(client, opts)
  @opts = opts
  @client = client
end

Instance Method Details

#closeObject



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/pulsar_sdk/producer/base.rb', line 49

def close
  return if @stoped

  base_cmd = Pulsar::Proto::BaseCommand.new(
    type: Pulsar::Proto::BaseCommand::Type::CLOSE_PRODUCER,
    close_producer: Pulsar::Proto::CommandCloseProducer.new
  )
  execute(base_cmd) unless disconnect?

  unbind_handler!

  @stoped = true

  @receipt_queue.close
end

#disconnect?Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/pulsar_sdk/producer/base.rb', line 65

def disconnect?
  !@established
end

#execute(cmd, msg = nil, timeout = nil) ⇒ Object



28
29
30
# File 'lib/pulsar_sdk/producer/base.rb', line 28

def execute(cmd, msg = nil, timeout = nil)
  write(cmd, msg, false, timeout)
end

#execute_async(cmd, msg = nil) ⇒ Object



32
33
34
# File 'lib/pulsar_sdk/producer/base.rb', line 32

def execute_async(cmd, msg = nil)
  write(cmd, msg, true)
end

#grab_cnxObject



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/pulsar_sdk/producer/base.rb', line 11

def grab_cnx
  topic = @opts.topic
  @conn = @client.connection(*@client.lookup(topic))
  @established = true

  @seq_generator = SeqGenerator.new(@conn.seq_generator)
  @producer_id = @seq_generator.new_producer_id

  @producer_name = [@opts.name, @producer_id].join('.')

  @receipt_queue = ReceiptQueue.new

  @stoped = false

  @producer_name = init_producer(topic)
end

#receiptObject

获取发送回执TODO get receipt by sequence_id



38
39
40
41
42
43
44
45
46
47
# File 'lib/pulsar_sdk/producer/base.rb', line 38

def receipt
  receipt_ = @receipt_queue.pop.first
  return if receipt_.nil?

  if block_given?
    yield receipt_
  end

  receipt_
end