Class: PushyClient::ProtocolHandler
- Inherits:
-
Object
- Object
- PushyClient::ProtocolHandler
show all
- Defined in:
- lib/pushy_client/protocol_handler.rb
Defined Under Namespace
Classes: TimeRecvWrapper, TimeSendWrapper
Constant Summary
collapse
- MAX_BODY_SIZE =
The maximum size, in bytes, allowed for a message body. This is not configurable because it is configurable (though not documented) on the server, and we don’t want to make users have to sync the two values. The max on the server is actually 65536, but we leave a little room since the server is measuring the signed message and we’re just counting the size of the stderr and stdout.
63000
Instance Attribute Summary collapse
Class Method Summary
collapse
-
.hmac_valid?(message, sig, session_key) ⇒ Boolean
-
.load_key(key_path) ⇒ Object
-
.make_header_hmac(json, session_key) ⇒ Object
-
.make_header_rsa(json, client_private_key) ⇒ Object
-
.rsa_valid?(message, sig, server_public_key) ⇒ Boolean
-
.send_signed_message(socket, method, client_private_key, session_key, message, client) ⇒ Object
-
.valid?(header, message, server_public_key, session_key) ⇒ Boolean
Message authentication (on receive).
Instance Method Summary
collapse
Constructor Details
Returns a new instance of ProtocolHandler.
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
# File 'lib/pushy_client/protocol_handler.rb', line 71
def initialize(client)
@client = client
@socket_lock = Mutex.new
@receive_socket_lock = Mutex.new
client.on_server_availability_change do |available|
if !available
Thread.new do
begin
Chef::Log.info "[#{node_name}] Closing and reopening sockets since server is down ..."
reconfigure
Chef::Log.info "[#{node_name}] Done closing and reopening sockets."
rescue
client.log_exception("Error reconfiguring sockets when server went down", $!)
end
end
end
end
end
|
Instance Attribute Details
#client ⇒ Object
Returns the value of attribute client.
96
97
98
|
# File 'lib/pushy_client/protocol_handler.rb', line 96
def client
@client
end
|
#client_private_key ⇒ Object
Returns the value of attribute client_private_key.
102
103
104
|
# File 'lib/pushy_client/protocol_handler.rb', line 102
def client_private_key
@client_private_key
end
|
#command_address ⇒ Object
Returns the value of attribute command_address.
98
99
100
|
# File 'lib/pushy_client/protocol_handler.rb', line 98
def command_address
@command_address
end
|
#server_heartbeat_address ⇒ Object
Returns the value of attribute server_heartbeat_address.
97
98
99
|
# File 'lib/pushy_client/protocol_handler.rb', line 97
def server_heartbeat_address
@server_heartbeat_address
end
|
#server_public_key ⇒ Object
Returns the value of attribute server_public_key.
99
100
101
|
# File 'lib/pushy_client/protocol_handler.rb', line 99
def server_public_key
@server_public_key
end
|
#session_key ⇒ Object
Returns the value of attribute session_key.
100
101
102
|
# File 'lib/pushy_client/protocol_handler.rb', line 100
def session_key
@session_key
end
|
#session_method ⇒ Object
Returns the value of attribute session_method.
101
102
103
|
# File 'lib/pushy_client/protocol_handler.rb', line 101
def session_method
@session_method
end
|
Class Method Details
.hmac_valid?(message, sig, session_key) ⇒ Boolean
462
463
464
465
466
467
|
# File 'lib/pushy_client/protocol_handler.rb', line 462
def self.hmac_valid?(message, sig, session_key)
message_sig = OpenSSL::HMAC.digest('sha256', session_key, message)
sha = OpenSSL::Digest::SHA512.new
sha.digest(sig) == sha.digest(message_sig)
end
|
.load_key(key_path) ⇒ Object
512
513
514
515
|
# File 'lib/pushy_client/protocol_handler.rb', line 512
def self.load_key(key_path)
raw_key = IO.read(key_path).strip
OpenSSL::PKey::RSA.new(raw_key)
end
|
523
524
525
526
527
|
# File 'lib/pushy_client/protocol_handler.rb', line 523
def self.(json, session_key)
sig = OpenSSL::HMAC.digest('sha256', session_key, json)
b64_sig = Base64.encode64(sig).chomp
"Version:2.0;SigningMethod:hmac_sha256;Signature:#{b64_sig}"
end
|
517
518
519
520
521
|
# File 'lib/pushy_client/protocol_handler.rb', line 517
def self.(json, client_private_key)
checksum = Mixlib::Authentication::Digester.hash_string(json)
b64_sig = Base64.encode64(client_private_key.private_encrypt(checksum)).chomp
"Version:2.0;SigningMethod:rsa2048_sha1;Signature:#{b64_sig}"
end
|
.rsa_valid?(message, sig, server_public_key) ⇒ Boolean
456
457
458
459
460
|
# File 'lib/pushy_client/protocol_handler.rb', line 456
def self.rsa_valid?(message, sig, server_public_key)
decrypted_checksum = server_public_key.public_decrypt(sig)
hashed_message = Mixlib::Authentication::Digester.hash_string(message)
decrypted_checksum == hashed_message
end
|
.send_signed_message(socket, method, client_private_key, session_key, message, client) ⇒ Object
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
|
# File 'lib/pushy_client/protocol_handler.rb', line 483
def self.send_signed_message(socket, method, client_private_key, session_key, message, client)
auth = case method
when :rsa2048_sha1
(message, client_private_key)
when :hmac_sha256
(message, session_key)
end
socket.send_string(auth, ZMQ::SNDMORE | ZMQ::DONTWAIT)
rc = socket.send_string(message, ZMQ::DONTWAIT)
if rc == -1
Chef::Log.info("[#{client.node_name}] ZMQ socket enqueue error #{ZMQ::Util.errno}. Triggering reconfigure")
client.update_reconfigure_deadline(60)
end
end
|
.valid?(header, message, server_public_key, session_key) ⇒ Boolean
Message authentication (on receive)
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
|
# File 'lib/pushy_client/protocol_handler.rb', line 430
def self.valid?(, message, server_public_key, session_key)
= .split(';')
= .inject({}) do |a,e|
k,v = e.split(':')
a[k] = v
a
end
auth_sig = ["Signature"]
if !auth_sig
return false
end
binary_sig = Base64.decode64(auth_sig)
auth_method = ["SigningMethod"]
case auth_method
when "rsa2048_sha1"
rsa_valid?(message, binary_sig, server_public_key)
when "hmac_sha256"
hmac_valid?(message, binary_sig, session_key)
else
false
end
end
|
Instance Method Details
#node_name ⇒ Object
104
105
106
|
# File 'lib/pushy_client/protocol_handler.rb', line 104
def node_name
client.node_name
end
|
186
187
188
189
190
191
192
193
|
# File 'lib/pushy_client/protocol_handler.rb', line 186
def reconfigure
@socket_lock.synchronize do
@receive_socket_lock.synchronize do
internal_stop
start
end
end
end
|
#send_command(message_type, job_id, params) ⇒ Object
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
|
# File 'lib/pushy_client/protocol_handler.rb', line 195
def send_command(message_type, job_id, params)
Chef::Log.debug("[#{node_name}] Sending command #{message_type} for job #{job_id}")
message = {
:node => node_name,
:client => client.hostname,
:protocol_version => PushyClient::PROTOCOL_VERSION,
:org => client.org_name,
:type => message_type,
:sequence => -1,
:timestamp => TimeSendWrapper.now.httpdate,
:incarnation_id => client.incarnation_id,
:job_id => job_id
}.merge(validate_params(params))
send_signed_json_command(:hmac_sha256, message)
end
|
#send_heartbeat(sequence) ⇒ Object
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
|
# File 'lib/pushy_client/protocol_handler.rb', line 213
def send_heartbeat(sequence)
Chef::Log.debug("[#{node_name}] Sending heartbeat (sequence ##{sequence})")
job_state = client.job_state
message = {
:node => node_name,
:client => client.hostname,
:protocol_version => PushyClient::PROTOCOL_VERSION,
:org => client.org_name,
:type => :heartbeat,
:sequence => -1,
:timestamp => TimeSendWrapper.now.httpdate,
:incarnation_id => client.incarnation_id,
:job_state => job_state[:state],
:job_id => job_state[:job_id]
}
send_signed_json_command(:hmac_sha256, message)
end
|
#start ⇒ Object
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
# File 'lib/pushy_client/protocol_handler.rb', line 108
def start
server_address = URI(client.config['push_jobs']['heartbeat']['out_addr']).host
check_server_address(node_name, server_address)
@server_heartbeat_address = client.config['push_jobs']['heartbeat']['out_addr']
@command_address = client.config['push_jobs']['heartbeat']['command_addr']
@server_public_key = OpenSSL::PKey::RSA.new(client.config['public_key'])
@client_private_key = ProtocolHandler::load_key(client.client_key)
@max_message_skew = client.config['max_message_skew']
if client.using_curve
server_curve_pub_key = client.config['curve_public_key']
begin
@session_method = client.config['encoded_session_key']['method']
enc_session_key = Base64::decode64(client.config['encoded_session_key']['key'])
@session_key = @client_private_key.private_decrypt(enc_session_key)
rescue =>_
Chef::Log.error "[#{node_name}] No session key found in config"
exit(-1)
end
else
begin
@session_method = client.config['encoded_session_key']['method']
enc_session_key = Base64::decode64(client.config['encoded_session_key']['key'])
@session_key = @client_private_key.private_decrypt(enc_session_key)
rescue =>e
Chef::Log.error "[#{node_name}] No session key found in config"
exit(-1)
end
end
Chef::Log.info "[#{node_name}] Starting ZMQ version #{LibZMQ.version}"
Chef::Log.info "[#{node_name}] Listening for server heartbeat at #{@server_heartbeat_address}"
@server_heartbeat_socket = PushyClient::ZmqContext.socket(ZMQ::SUB)
@server_heartbeat_socket.connect(@server_heartbeat_address)
@server_heartbeat_socket.setsockopt(ZMQ::SUBSCRIBE, "")
@server_heartbeat_seq_no = -1
Chef::Log.info "[#{node_name}] Connecting to command channel at #{@command_address}"
@command_socket = PushyClient::ZmqContext.socket(ZMQ::DEALER)
@command_socket.setsockopt(ZMQ::LINGER, 0)
@command_socket.setsockopt(ZMQ::RCVHWM, 0)
@command_socket.setsockopt(ZMQ::SNDHWM, 3)
if client.using_curve
@command_socket.setsockopt(ZMQ::CURVE_SERVERKEY, server_curve_pub_key)
@command_socket.setsockopt(ZMQ::CURVE_PUBLICKEY, client.client_curve_pub_key)
@command_socket.setsockopt(ZMQ::CURVE_SECRETKEY, client.client_curve_sec_key)
end
@command_socket.connect(@command_address)
@command_socket_server_seq_no = -1
@command_socket_outgoing_seq = 0
@receive_thread = start_receive_thread
end
|
#stop ⇒ Object
178
179
180
181
182
183
184
|
# File 'lib/pushy_client/protocol_handler.rb', line 178
def stop
@socket_lock.synchronize do
@receive_socket_lock.synchronize do
internal_stop
end
end
end
|