Class: PushyClient::ProtocolHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/pushy_client/protocol_handler.rb

Defined Under Namespace

Classes: TimeRecvWrapper, TimeSendWrapper

Constant Summary collapse

MAX_BODY_SIZE =

The maximum size, in bytes, allowed for a message body. This is not configurable because it is configurable (though not documented) on the server, and we don’t want to make users have to sync the two values. The max on the server is actually 65536, but we leave a little room since the server is measuring the signed message and we’re just counting the size of the stderr and stdout.

63000

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client) ⇒ ProtocolHandler

Returns a new instance of ProtocolHandler.



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/pushy_client/protocol_handler.rb', line 67

def initialize(client)
  @client = client
  # We synchronize on this when we change the socket (so if you want a
  # valid socket to send or receive on, synchronize on this)
  @socket_lock = Mutex.new
  # This holds the same purpose, but receive blocks for a while so it gets
  # its own lock to avoid blocking sends.  reconfigure will take both locks.
  @receive_socket_lock = Mutex.new

  # When the server goes down, close and reopen sockets.
  client.on_server_availability_change do |available|
    if !available
      Thread.new do
        begin
          Chef::Log.info "[#{node_name}] Closing and reopening sockets since server is down ..."
          reconfigure
          Chef::Log.info "[#{node_name}] Done closing and reopening sockets."
        rescue
          client.log_exception("Error reconfiguring sockets when server went down", $!)
        end
      end
    end
  end
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



92
93
94
# File 'lib/pushy_client/protocol_handler.rb', line 92

def client
  @client
end

#client_private_keyObject (readonly)

Returns the value of attribute client_private_key.



98
99
100
# File 'lib/pushy_client/protocol_handler.rb', line 98

def client_private_key
  @client_private_key
end

#command_addressObject (readonly)

Returns the value of attribute command_address.



94
95
96
# File 'lib/pushy_client/protocol_handler.rb', line 94

def command_address
  @command_address
end

#server_heartbeat_addressObject (readonly)

Returns the value of attribute server_heartbeat_address.



93
94
95
# File 'lib/pushy_client/protocol_handler.rb', line 93

def server_heartbeat_address
  @server_heartbeat_address
end

#server_public_keyObject (readonly)

Returns the value of attribute server_public_key.



95
96
97
# File 'lib/pushy_client/protocol_handler.rb', line 95

def server_public_key
  @server_public_key
end

#session_keyObject (readonly)

Returns the value of attribute session_key.



96
97
98
# File 'lib/pushy_client/protocol_handler.rb', line 96

def session_key
  @session_key
end

#session_methodObject (readonly)

Returns the value of attribute session_method.



97
98
99
# File 'lib/pushy_client/protocol_handler.rb', line 97

def session_method
  @session_method
end

Instance Method Details

#node_nameObject



100
101
102
# File 'lib/pushy_client/protocol_handler.rb', line 100

def node_name
  client.node_name
end

#reconfigureObject



182
183
184
185
186
187
188
189
# File 'lib/pushy_client/protocol_handler.rb', line 182

def reconfigure
  @socket_lock.synchronize do
    @receive_socket_lock.synchronize do
      internal_stop
      start # Start picks up new configuration
    end
  end
end

#send_command(message_type, job_id, params) ⇒ Object



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/pushy_client/protocol_handler.rb', line 191

def send_command(message_type, job_id, params)
  Chef::Log.debug("[#{node_name}] Sending command #{message_type} for job #{job_id}")
  # Nil params will be stripped by JSON.generate()
  message = {
    :node => node_name,
    :client => client.hostname,
    :protocol_version => PushyClient::PROTOCOL_VERSION,
    :org => client.org_name,
    :type => message_type,
    :sequence => -1,
    :timestamp => TimeSendWrapper.now.httpdate,
    :incarnation_id => client.incarnation_id,
    :job_id => job_id
  }.merge(validate_params(params))

  send_signed_json_command(:hmac_sha256, message)
end

#send_heartbeat(sequence) ⇒ Object



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/pushy_client/protocol_handler.rb', line 209

def send_heartbeat(sequence)
  Chef::Log.debug("[#{node_name}] Sending heartbeat (sequence ##{sequence})")
  job_state = client.job_state
  message = {
    :node => node_name,
    :client => client.hostname,
    :protocol_version => PushyClient::PROTOCOL_VERSION,
    :org => client.org_name,
    :type => :heartbeat,
    :sequence => -1,
    :timestamp => TimeSendWrapper.now.httpdate,
    :incarnation_id => client.incarnation_id,
    :job_state => job_state[:state],
    :job_id => job_state[:job_id]
  }

  send_signed_json_command(:hmac_sha256, message)
end

#startObject



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/pushy_client/protocol_handler.rb', line 104

def start
  server_address = URI(client.config['push_jobs']['heartbeat']['out_addr']).host
  check_server_address(node_name, server_address)

  @server_heartbeat_address = client.config['push_jobs']['heartbeat']['out_addr']
  @command_address = client.config['push_jobs']['heartbeat']['command_addr']
  @server_public_key = OpenSSL::PKey::RSA.new(client.config['public_key'])
  @client_private_key = ProtocolHandler::load_key(client.client_key)
  @max_message_skew = client.config['max_message_skew']

  if client.using_curve
    server_curve_pub_key = client.config['curve_public_key']

    # decode and extract session key
    begin
      @session_method = client.config['encoded_session_key']['method']
      enc_session_key = Base64::decode64(client.config['encoded_session_key']['key'])
    @session_key = @client_private_key.private_decrypt(enc_session_key)
    rescue =>_
      Chef::Log.error "[#{node_name}] No session key found in config"
      exit(-1)
    end
  else
    # decode and extract session key
    begin
      @session_method = client.config['encoded_session_key']['method']
      enc_session_key = Base64::decode64(client.config['encoded_session_key']['key'])
      @session_key = @client_private_key.private_decrypt(enc_session_key)
    rescue =>e
      Chef::Log.error "[#{node_name}] No session key found in config"
      exit(-1)
    end
  end

  Chef::Log.info "[#{node_name}] Starting ZMQ version #{LibZMQ.version}"

  # Server heartbeat socket
  Chef::Log.info "[#{node_name}] Listening for server heartbeat at #{@server_heartbeat_address}"
  @server_heartbeat_socket = PushyClient::ZmqContext.socket(ZMQ::SUB)
  @server_heartbeat_socket.connect(@server_heartbeat_address)
  @server_heartbeat_socket.setsockopt(ZMQ::SUBSCRIBE, "")
  @server_heartbeat_seq_no = -1

  # Command socket
  Chef::Log.info "[#{node_name}] Connecting to command channel at #{@command_address}"
  # TODO
  # This needs to be set up to be able to handle bidirectional messages; right now this is Tx only
  # Probably need to set it up with a handler, like the subscriber socket above.
  @command_socket = PushyClient::ZmqContext.socket(ZMQ::DEALER)
  @command_socket.setsockopt(ZMQ::LINGER, 0)
  # Note setting this to '1' causes the client to crash on send, but perhaps that
  # beats storming the server when the server restarts
  @command_socket.setsockopt(ZMQ::RCVHWM, 0)
  # Buffering more than a few heartbeats can cause trauma on the server after restart
  @command_socket.setsockopt(ZMQ::SNDHWM, 3)

  if client.using_curve
    @command_socket.setsockopt(ZMQ::CURVE_SERVERKEY, server_curve_pub_key)
    @command_socket.setsockopt(ZMQ::CURVE_PUBLICKEY, client.client_curve_pub_key)
    @command_socket.setsockopt(ZMQ::CURVE_SECRETKEY, client.client_curve_sec_key)
  end

  @command_socket.connect(@command_address)
  @command_socket_server_seq_no = -1

  @command_socket_outgoing_seq = 0

  @receive_thread = start_receive_thread
end

#stopObject



174
175
176
177
178
179
180
# File 'lib/pushy_client/protocol_handler.rb', line 174

def stop
  @socket_lock.synchronize do
    @receive_socket_lock.synchronize do
      internal_stop
    end
  end
end