Class: PulsarSdk::Producer::Manager

Inherits:
Object
  • Object
show all
Includes:
Tweaks::CleanInspect
Defined in:
lib/pulsar_sdk/producer/manager.rb

Instance Method Summary collapse

Methods included from Tweaks::CleanInspect

#inspect

Constructor Details

#initialize(client, opts) ⇒ Manager

Returns a new instance of Manager.



6
7
8
9
10
# File 'lib/pulsar_sdk/producer/manager.rb', line 6

def initialize(client, opts)
  @topic = opts.topic
  @producers = init_producer_by(client, opts)
  @router = opts.router
end

Instance Method Details

#closeObject



39
40
41
# File 'lib/pulsar_sdk/producer/manager.rb', line 39

def close
  @producers.each(&:close)
end

#execute(cmd, msg = nil, timeout = nil) ⇒ Object



12
13
14
15
16
17
# File 'lib/pulsar_sdk/producer/manager.rb', line 12

def execute(cmd, msg = nil, timeout = nil)
  raise "cmd expected a Pulsar::Proto::BaseCommand got #{cmd.class}" unless cmd.is_a?(Pulsar::Proto::BaseCommand)
  real_producer(msg) do |producer|
    producer.execute(cmd, msg, timeout)
  end
end

#execute_async(cmd, msg = nil) ⇒ Object



19
20
21
22
23
24
# File 'lib/pulsar_sdk/producer/manager.rb', line 19

def execute_async(cmd, msg = nil)
  raise "cmd expected a Pulsar::Proto::BaseCommand got #{cmd.class}" unless cmd.is_a?(Pulsar::Proto::BaseCommand)
  real_producer(msg) do |producer|
    producer.execute_async(cmd, msg)
  end
end

#real_producer(msg) {|@producers[route_index]| ... } ⇒ Object

Yields:

  • (@producers[route_index])


26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/pulsar_sdk/producer/manager.rb', line 26

def real_producer(msg, &block)
  if @producers.size.zero?
    PulsarSdk.logger.warn(__method__){"There is no available producer for topic: 「#{@topic}」, skipping action!"}
    return
  end

  ensure_connection

  route_index = msg.nil? ? 0 : @router.route(msg.key, @producers.size)

  yield @producers[route_index]
end