Class: PulsarSdk::Protocol::Lookup
- Inherits:
-
Object
- Object
- PulsarSdk::Protocol::Lookup
- Defined in:
- lib/pulsar_sdk/protocol/lookup.rb
Constant Summary collapse
- MAX_LOOKUP_TIMES =
20
Instance Method Summary collapse
-
#initialize(client, service_url) ⇒ Lookup
constructor
A new instance of Lookup.
-
#lookup(topic) ⇒ Object
output [logical_addr, physical_addr].
Constructor Details
#initialize(client, service_url) ⇒ Lookup
Returns a new instance of Lookup.
6 7 8 9 |
# File 'lib/pulsar_sdk/protocol/lookup.rb', line 6 def initialize(client, service_url) @client = client @service_url = service_url end |
Instance Method Details
#lookup(topic) ⇒ Object
output
[logical_addr, physical_addr]
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/pulsar_sdk/protocol/lookup.rb', line 13 def lookup(topic) base_cmd = Pulsar::Proto::BaseCommand.new( type: Pulsar::Proto::BaseCommand::Type::LOOKUP, lookupTopic: Pulsar::Proto::CommandLookupTopic.new( topic: topic, authoritative: false ) ) resp = @client.request_any_broker(base_cmd).lookupTopicResponse # 最多查找这么多次 MAX_LOOKUP_TIMES.times do case Pulsar::Proto::CommandLookupTopicResponse::LookupType.resolve(resp.response) when Pulsar::Proto::CommandLookupTopicResponse::LookupType::Failed PulsarSdk.logger.error(__method__){"Failed to lookup topic 「#{topic}」, #{resp.error}"} break when Pulsar::Proto::CommandLookupTopicResponse::LookupType::Redirect logical_addr, physical_addr = extract_addr(resp) base_cmd = Pulsar::Proto::BaseCommand.new( type: Pulsar::Proto::BaseCommand::Type::LOOKUP, lookupTopic: Pulsar::Proto::CommandLookupTopic.new( topic: topic, authoritative: resp. ) ) # NOTE 从连接池拿 resp = @client.request(logical_addr, physical_addr, base_cmd).lookupTopicResponse when Pulsar::Proto::CommandLookupTopicResponse::LookupType::Connect return extract_addr(resp) end end end |