Class: PulsarSdk::Client::Rpc

Inherits:
Object
  • Object
show all
Includes:
Tweaks::CleanInspect
Defined in:
lib/pulsar_sdk/client/rpc.rb

Instance Method Summary collapse

Methods included from Tweaks::CleanInspect

#inspect

Constructor Details

#initialize(opts) ⇒ Rpc

Returns a new instance of Rpc.



6
7
8
9
10
11
12
13
14
15
# File 'lib/pulsar_sdk/client/rpc.rb', line 6

def initialize(opts)
  raise "opts expected a PulsarSdk::Options::Connection got #{opts.class}" unless opts.is_a?(PulsarSdk::Options::Connection)

  @opts = opts

  @cnx = ::PulsarSdk::Client::ConnectionPool.new(opts).tap {|x| x.run_checker}

  @producer_id = 0
  @consumer_id = 0
end

Instance Method Details

#closeObject



44
45
46
# File 'lib/pulsar_sdk/client/rpc.rb', line 44

def close
  @cnx.close
end

#connection(logical_addr = nil, physical_addr = nil) ⇒ Object



17
18
19
20
# File 'lib/pulsar_sdk/client/rpc.rb', line 17

def connection(logical_addr = nil, physical_addr = nil)
  logical_addr ||= @opts.logical_addr
  @cnx.fetch(logical_addr, physical_addr)
end

#create_producer(opts) ⇒ Object



48
49
50
51
52
# File 'lib/pulsar_sdk/client/rpc.rb', line 48

def create_producer(opts)
  raise "opts expected a PulsarSdk::Options::Producer got #{opts.class}" unless opts.is_a?(PulsarSdk::Options::Producer)
  # FIXME check if connection ready
  ::PulsarSdk::Producer.create(self, opts)
end

#create_reader(opts = {}) ⇒ Object



62
63
64
# File 'lib/pulsar_sdk/client/rpc.rb', line 62

def create_reader(opts = {})

end

#lookup(topic) ⇒ Object



22
23
24
25
# File 'lib/pulsar_sdk/client/rpc.rb', line 22

def lookup(topic)
  @lookup_service ||= ::PulsarSdk::Protocol::Lookup.new(self, @opts.logical_addr)
  @lookup_service.lookup(topic)
end

#namespace_topics(namespace) ⇒ Object



27
28
29
30
# File 'lib/pulsar_sdk/client/rpc.rb', line 27

def namespace_topics(namespace)
  @namespace_service ||= ::PulsarSdk::Protocol::Namespace.new(self)
  @namespace_service.topics(namespace)
end

#partition_topics(topic) ⇒ Object



32
33
34
# File 'lib/pulsar_sdk/client/rpc.rb', line 32

def partition_topics(topic)
  ::PulsarSdk::Protocol::Partitioned.new(self, topic)&.partitions || []
end

#request(physical_addr, logical_addr, cmd) ⇒ Object



36
37
38
# File 'lib/pulsar_sdk/client/rpc.rb', line 36

def request(physical_addr, logical_addr, cmd)
  connection(physical_addr, logical_addr).request(cmd, nil, true)
end

#request_any_broker(cmd) ⇒ Object



40
41
42
# File 'lib/pulsar_sdk/client/rpc.rb', line 40

def request_any_broker(cmd)
  connection.request(cmd)
end

#subscribe(opts) ⇒ Object



54
55
56
57
58
59
60
# File 'lib/pulsar_sdk/client/rpc.rb', line 54

def subscribe(opts)
  raise "opts expected a PulsarSdk::Options::Consumer got #{opts.class}" unless opts.is_a?(PulsarSdk::Options::Consumer)
  # FIXME check if connection ready
  consumer = ::PulsarSdk::Consumer.create(self, opts)

  consumer
end