Class: PulsarSdk::Protocol::Lookup

Inherits:
Object
  • Object
show all
Defined in:
lib/pulsar_sdk/protocol/lookup.rb

Constant Summary collapse

MAX_LOOKUP_TIMES =
20

Instance Method Summary collapse

Constructor Details

#initialize(client, service_url) ⇒ Lookup

Returns a new instance of Lookup.



6
7
8
9
# File 'lib/pulsar_sdk/protocol/lookup.rb', line 6

def initialize(client, service_url)
  @client = client
  @service_url = service_url
end

Instance Method Details

#lookup(topic) ⇒ Object

output

[logical_addr, physical_addr]


13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/pulsar_sdk/protocol/lookup.rb', line 13

def lookup(topic)
  base_cmd = Pulsar::Proto::BaseCommand.new(
    type: Pulsar::Proto::BaseCommand::Type::LOOKUP,
    lookupTopic: Pulsar::Proto::CommandLookupTopic.new(
      topic: topic,
      authoritative: false
    )
  )
  resp = @client.request_any_broker(base_cmd).lookupTopicResponse

  # 最多查找这么多次
  MAX_LOOKUP_TIMES.times do
    case Pulsar::Proto::CommandLookupTopicResponse::LookupType.resolve(resp.response)
    when Pulsar::Proto::CommandLookupTopicResponse::LookupType::Failed
      PulsarSdk.logger.error(__method__){"Failed to lookup topic 「#{topic}」, #{resp.error}"}
      break
    when Pulsar::Proto::CommandLookupTopicResponse::LookupType::Redirect
      logical_addr, physical_addr = extract_addr(resp)
      base_cmd = Pulsar::Proto::BaseCommand.new(
        type: Pulsar::Proto::BaseCommand::Type::LOOKUP,
        lookupTopic: Pulsar::Proto::CommandLookupTopic.new(
          topic: topic,
          authoritative: resp.authoritative
        )
      )
      # NOTE 从连接池拿
      resp = @client.request(logical_addr, physical_addr, base_cmd).lookupTopicResponse
    when Pulsar::Proto::CommandLookupTopicResponse::LookupType::Connect
      return extract_addr(resp)
    end
  end
end