Class: PulsarSdk::Client::Rpc
- Inherits:
-
Object
- Object
- PulsarSdk::Client::Rpc
- Includes:
- Tweaks::CleanInspect
- Defined in:
- lib/pulsar_sdk/client/rpc.rb
Instance Method Summary collapse
- #close ⇒ Object
- #connection(logical_addr = nil, physical_addr = nil) ⇒ Object
- #create_producer(opts) ⇒ Object
- #create_reader(opts = {}) ⇒ Object
-
#initialize(opts) ⇒ Rpc
constructor
A new instance of Rpc.
- #lookup(topic) ⇒ Object
- #namespace_topics(namespace) ⇒ Object
- #partition_topics(topic) ⇒ Object
- #request(physical_addr, logical_addr, cmd) ⇒ Object
- #request_any_broker(cmd) ⇒ Object
- #subscribe(opts) ⇒ Object
Methods included from Tweaks::CleanInspect
Constructor Details
#initialize(opts) ⇒ Rpc
Returns a new instance of Rpc.
6 7 8 9 10 11 12 13 14 15 |
# File 'lib/pulsar_sdk/client/rpc.rb', line 6 def initialize(opts) raise "opts expected a PulsarSdk::Options::Connection got #{opts.class}" unless opts.is_a?(PulsarSdk::Options::Connection) @opts = opts @cnx = ::PulsarSdk::Client::ConnectionPool.new(opts).tap {|x| x.run_checker} @producer_id = 0 @consumer_id = 0 end |
Instance Method Details
#close ⇒ Object
44 45 46 |
# File 'lib/pulsar_sdk/client/rpc.rb', line 44 def close @cnx.close end |
#connection(logical_addr = nil, physical_addr = nil) ⇒ Object
17 18 19 20 |
# File 'lib/pulsar_sdk/client/rpc.rb', line 17 def connection(logical_addr = nil, physical_addr = nil) logical_addr ||= @opts.logical_addr @cnx.fetch(logical_addr, physical_addr) end |
#create_producer(opts) ⇒ Object
48 49 50 51 52 |
# File 'lib/pulsar_sdk/client/rpc.rb', line 48 def create_producer(opts) raise "opts expected a PulsarSdk::Options::Producer got #{opts.class}" unless opts.is_a?(PulsarSdk::Options::Producer) # FIXME check if connection ready ::PulsarSdk::Producer.create(self, opts) end |
#create_reader(opts = {}) ⇒ Object
62 63 64 |
# File 'lib/pulsar_sdk/client/rpc.rb', line 62 def create_reader(opts = {}) end |
#lookup(topic) ⇒ Object
22 23 24 25 |
# File 'lib/pulsar_sdk/client/rpc.rb', line 22 def lookup(topic) @lookup_service ||= ::PulsarSdk::Protocol::Lookup.new(self, @opts.logical_addr) @lookup_service.lookup(topic) end |
#namespace_topics(namespace) ⇒ Object
27 28 29 30 |
# File 'lib/pulsar_sdk/client/rpc.rb', line 27 def namespace_topics(namespace) @namespace_service ||= ::PulsarSdk::Protocol::Namespace.new(self) @namespace_service.topics(namespace) end |
#partition_topics(topic) ⇒ Object
32 33 34 |
# File 'lib/pulsar_sdk/client/rpc.rb', line 32 def partition_topics(topic) ::PulsarSdk::Protocol::Partitioned.new(self, topic)&.partitions || [] end |
#request(physical_addr, logical_addr, cmd) ⇒ Object
36 37 38 |
# File 'lib/pulsar_sdk/client/rpc.rb', line 36 def request(physical_addr, logical_addr, cmd) connection(physical_addr, logical_addr).request(cmd, nil, true) end |
#request_any_broker(cmd) ⇒ Object
40 41 42 |
# File 'lib/pulsar_sdk/client/rpc.rb', line 40 def request_any_broker(cmd) connection.request(cmd) end |
#subscribe(opts) ⇒ Object
54 55 56 57 58 59 60 |
# File 'lib/pulsar_sdk/client/rpc.rb', line 54 def subscribe(opts) raise "opts expected a PulsarSdk::Options::Consumer got #{opts.class}" unless opts.is_a?(PulsarSdk::Options::Consumer) # FIXME check if connection ready consumer = ::PulsarSdk::Consumer.create(self, opts) consumer end |