Class: PulsarSdk::Client::Connection

Inherits:
Object
  • Object
show all
Includes:
Tweaks::CleanInspect
Defined in:
lib/pulsar_sdk/client/connection.rb

Defined Under Namespace

Classes: ConsumerHandler, ProducerHandler, ResponseContainer, SeqGenerator, Status

Constant Summary collapse

CLIENT_NAME =
"pulsar-client-#{PulsarSdk::VERSION}".freeze
PROTOCOL_VER =
Pulsar::Proto::ProtocolVersion::V13

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Tweaks::CleanInspect

#inspect

Constructor Details

#initialize(opts) ⇒ Connection

opts PulsarSdk::Options::Connection



17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/pulsar_sdk/client/connection.rb', line 17

def initialize(opts)
  @conn_options = opts

  @socket = nil
  @state = Status.new

  @seq_generator = SeqGenerator.new

  @consumer_handlers = ConsumerHandler.new
  @producer_handlers = ProducerHandler.new
  @response_container = ResponseContainer.new
end

Instance Attribute Details

#consumer_handlersObject (readonly)

Returns the value of attribute consumer_handlers.



11
12
13
# File 'lib/pulsar_sdk/client/connection.rb', line 11

def consumer_handlers
  @consumer_handlers
end

#producer_handlersObject (readonly)

Returns the value of attribute producer_handlers.



12
13
14
# File 'lib/pulsar_sdk/client/connection.rb', line 12

def producer_handlers
  @producer_handlers
end

#response_containerObject (readonly)

用于处理状态回调



13
14
15
# File 'lib/pulsar_sdk/client/connection.rb', line 13

def response_container
  @response_container
end

#seq_generatorObject (readonly)

Returns the value of attribute seq_generator.



14
15
16
# File 'lib/pulsar_sdk/client/connection.rb', line 14

def seq_generator
  @seq_generator
end

Class Method Details

.establish(opts) ⇒ Object



36
37
38
39
40
41
42
# File 'lib/pulsar_sdk/client/connection.rb', line 36

def self.establish(opts)
  conn = new(opts).tap do |c|
    c.start
  end
  # TODO check connection ready
  conn
end

Instance Method Details

#active_statusObject



69
70
71
# File 'lib/pulsar_sdk/client/connection.rb', line 69

def active_status
  [@state.last_ping_at, @state.last_received_at]
end

#closeObject



44
45
46
47
48
49
50
51
52
# File 'lib/pulsar_sdk/client/connection.rb', line 44

def close
  @state.closed!
  consumer_handlers.each{|_k, v| v.call}
  producer_handlers.each{|_k, v| v.call}
  Timeout::timeout(2) {@pong&.join} rescue @pong&.kill
  @pong&.join
ensure
  @socket.close
end

#closed?Boolean

Returns:

  • (Boolean)


54
55
56
# File 'lib/pulsar_sdk/client/connection.rb', line 54

def closed?
  @state.closed?
end

#pingObject



58
59
60
61
62
63
64
65
66
67
# File 'lib/pulsar_sdk/client/connection.rb', line 58

def ping
  base_cmd = Pulsar::Proto::BaseCommand.new(
    type: Pulsar::Proto::BaseCommand::Type::PING,
    ping: Pulsar::Proto::CommandPing.new
  )

  request(base_cmd, nil, true)

  @state.ping!
end

#request(cmd, msg = nil, async = false, timeout = nil) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/pulsar_sdk/client/connection.rb', line 73

def request(cmd, msg = nil, async = false, timeout = nil)
  raise 'connection was closed!' if closed?

  cmd.seq_generator ||= @seq_generator

  # NOTE try to auto set *_id
  cmd.handle_ids

  frame = PulsarSdk::Protocol::Frame.encode(cmd, msg)
  write(frame)
  return true if async

  if request_id = cmd.get_request_id
    return @response_container.delete(request_id, timeout)
  end

  true
end

#startObject



30
31
32
33
34
# File 'lib/pulsar_sdk/client/connection.rb', line 30

def start
  unless connect && do_hand_shake && listen
    @state.closed!
  end
end