Class: PulsarSdk::Client::Connection
- Inherits:
-
Object
- Object
- PulsarSdk::Client::Connection
show all
- Includes:
- Tweaks::CleanInspect
- Defined in:
- lib/pulsar_sdk/client/connection.rb
Defined Under Namespace
Classes: ConsumerHandler, ProducerHandler, ResponseContainer, SeqGenerator, Status
Constant Summary
collapse
- CLIENT_NAME =
"pulsar-client-#{PulsarSdk::VERSION}".freeze
- PROTOCOL_VER =
Pulsar::Proto::ProtocolVersion::V13
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
#inspect
Constructor Details
opts PulsarSdk::Options::Connection
Instance Attribute Details
#consumer_handlers ⇒ Object
Returns the value of attribute consumer_handlers.
11
12
13
|
# File 'lib/pulsar_sdk/client/connection.rb', line 11
def consumer_handlers
@consumer_handlers
end
|
#producer_handlers ⇒ Object
Returns the value of attribute producer_handlers.
12
13
14
|
# File 'lib/pulsar_sdk/client/connection.rb', line 12
def producer_handlers
@producer_handlers
end
|
#response_container ⇒ Object
13
14
15
|
# File 'lib/pulsar_sdk/client/connection.rb', line 13
def response_container
@response_container
end
|
#seq_generator ⇒ Object
Returns the value of attribute seq_generator.
14
15
16
|
# File 'lib/pulsar_sdk/client/connection.rb', line 14
def seq_generator
@seq_generator
end
|
Class Method Details
.establish(opts) ⇒ Object
36
37
38
39
40
41
42
|
# File 'lib/pulsar_sdk/client/connection.rb', line 36
def self.establish(opts)
conn = new(opts).tap do |c|
c.start
end
conn
end
|
Instance Method Details
#active_status ⇒ Object
69
70
71
|
# File 'lib/pulsar_sdk/client/connection.rb', line 69
def active_status
[@state.last_ping_at, @state.last_received_at]
end
|
#close ⇒ Object
44
45
46
47
48
49
50
51
52
|
# File 'lib/pulsar_sdk/client/connection.rb', line 44
def close
@state.closed!
consumer_handlers.each{|_k, v| v.call}
producer_handlers.each{|_k, v| v.call}
Timeout::timeout(2) {@pong&.join} rescue @pong&.kill
@pong&.join
ensure
@socket.close
end
|
#closed? ⇒ Boolean
54
55
56
|
# File 'lib/pulsar_sdk/client/connection.rb', line 54
def closed?
@state.closed?
end
|
#request(cmd, msg = nil, async = false, timeout = nil) ⇒ Object
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
# File 'lib/pulsar_sdk/client/connection.rb', line 73
def request(cmd, msg = nil, async = false, timeout = nil)
raise 'connection was closed!' if closed?
cmd.seq_generator ||= @seq_generator
cmd.handle_ids
frame = PulsarSdk::Protocol::Frame.encode(cmd, msg)
write(frame)
return true if async
if request_id = cmd.get_request_id
return @response_container.delete(request_id, timeout)
end
true
end
|
#start ⇒ Object
30
31
32
33
34
|
# File 'lib/pulsar_sdk/client/connection.rb', line 30
def start
unless connect && do_hand_shake && listen
@state.closed!
end
end
|