Class: PushyClient::ProtocolHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/pushy_client/protocol_handler.rb

Defined Under Namespace

Classes: TimeRecvWrapper, TimeSendWrapper

Constant Summary collapse

MAX_BODY_SIZE =

The maximum size, in bytes, allowed for a message body. This is not configurable because it is configurable (though not documented) on the server, and we don’t want to make users have to sync the two values. The max on the server is actually 65536, but we leave a little room since the server is measuring the signed message and we’re just counting the size of the stderr and stdout.

63000

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client) ⇒ ProtocolHandler

Returns a new instance of ProtocolHandler.



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/pushy_client/protocol_handler.rb', line 71

def initialize(client)
  @client = client
  # We synchronize on this when we change the socket (so if you want a
  # valid socket to send or receive on, synchronize on this)
  @socket_lock = Mutex.new
  # This holds the same purpose, but receive blocks for a while so it gets
  # its own lock to avoid blocking sends.  reconfigure will take both locks.
  @receive_socket_lock = Mutex.new

  # When the server goes down, close and reopen sockets.
  client.on_server_availability_change do |available|
    if !available
      Thread.new do
        begin
          Chef::Log.info "[#{node_name}] Closing and reopening sockets since server is down ..."
          reconfigure
          Chef::Log.info "[#{node_name}] Done closing and reopening sockets."
        rescue
          client.log_exception("Error reconfiguring sockets when server went down", $!)
        end
      end
    end
  end
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



96
97
98
# File 'lib/pushy_client/protocol_handler.rb', line 96

def client
  @client
end

#client_private_keyObject (readonly)

Returns the value of attribute client_private_key.



102
103
104
# File 'lib/pushy_client/protocol_handler.rb', line 102

def client_private_key
  @client_private_key
end

#command_addressObject (readonly)

Returns the value of attribute command_address.



98
99
100
# File 'lib/pushy_client/protocol_handler.rb', line 98

def command_address
  @command_address
end

#server_heartbeat_addressObject (readonly)

Returns the value of attribute server_heartbeat_address.



97
98
99
# File 'lib/pushy_client/protocol_handler.rb', line 97

def server_heartbeat_address
  @server_heartbeat_address
end

#server_public_keyObject (readonly)

Returns the value of attribute server_public_key.



99
100
101
# File 'lib/pushy_client/protocol_handler.rb', line 99

def server_public_key
  @server_public_key
end

#session_keyObject (readonly)

Returns the value of attribute session_key.



100
101
102
# File 'lib/pushy_client/protocol_handler.rb', line 100

def session_key
  @session_key
end

#session_methodObject (readonly)

Returns the value of attribute session_method.



101
102
103
# File 'lib/pushy_client/protocol_handler.rb', line 101

def session_method
  @session_method
end

Instance Method Details

#node_nameObject



104
105
106
# File 'lib/pushy_client/protocol_handler.rb', line 104

def node_name
  client.node_name
end

#reconfigureObject



186
187
188
189
190
191
192
193
# File 'lib/pushy_client/protocol_handler.rb', line 186

def reconfigure
  @socket_lock.synchronize do
    @receive_socket_lock.synchronize do
      internal_stop
      start # Start picks up new configuration
    end
  end
end

#send_command(message_type, job_id, params) ⇒ Object



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/pushy_client/protocol_handler.rb', line 195

def send_command(message_type, job_id, params)
  Chef::Log.debug("[#{node_name}] Sending command #{message_type} for job #{job_id}")
  # Nil params will be stripped by JSON.generate()
  message = {
    :node => node_name,
    :client => client.hostname,
    :protocol_version => PushyClient::PROTOCOL_VERSION,
    :org => client.org_name,
    :type => message_type,
    :sequence => -1,
    :timestamp => TimeSendWrapper.now.httpdate,
    :incarnation_id => client.incarnation_id,
    :job_id => job_id
  }.merge(validate_params(params))

  send_signed_json_command(:hmac_sha256, message)
end

#send_heartbeat(sequence) ⇒ Object



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/pushy_client/protocol_handler.rb', line 213

def send_heartbeat(sequence)
  Chef::Log.debug("[#{node_name}] Sending heartbeat (sequence ##{sequence})")
  job_state = client.job_state
  message = {
    :node => node_name,
    :client => client.hostname,
    :protocol_version => PushyClient::PROTOCOL_VERSION,
    :org => client.org_name,
    :type => :heartbeat,
    :sequence => -1,
    :timestamp => TimeSendWrapper.now.httpdate,
    :incarnation_id => client.incarnation_id,
    :job_state => job_state[:state],
    :job_id => job_state[:job_id]
  }

  send_signed_json_command(:hmac_sha256, message)
end

#startObject



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/pushy_client/protocol_handler.rb', line 108

def start
  server_address = URI(client.config['push_jobs']['heartbeat']['out_addr']).host
  check_server_address(node_name, server_address)

  @server_heartbeat_address = client.config['push_jobs']['heartbeat']['out_addr']
  @command_address = client.config['push_jobs']['heartbeat']['command_addr']
  @server_public_key = OpenSSL::PKey::RSA.new(client.config['public_key'])
  @client_private_key = ProtocolHandler::load_key(client.client_key)
  @max_message_skew = client.config['max_message_skew']

  if client.using_curve
    server_curve_pub_key = client.config['curve_public_key']

    # decode and extract session key
    begin
      @session_method = client.config['encoded_session_key']['method']
      enc_session_key = Base64::decode64(client.config['encoded_session_key']['key'])
    @session_key = @client_private_key.private_decrypt(enc_session_key)
    rescue =>_
      Chef::Log.error "[#{node_name}] No session key found in config"
      exit(-1)
    end
  else
    # decode and extract session key
    begin
      @session_method = client.config['encoded_session_key']['method']
      enc_session_key = Base64::decode64(client.config['encoded_session_key']['key'])
      @session_key = @client_private_key.private_decrypt(enc_session_key)
    rescue =>e
      Chef::Log.error "[#{node_name}] No session key found in config"
      exit(-1)
    end
  end

  Chef::Log.info "[#{node_name}] Starting ZMQ version #{LibZMQ.version}"

  # Server heartbeat socket
  Chef::Log.info "[#{node_name}] Listening for server heartbeat at #{@server_heartbeat_address}"
  @server_heartbeat_socket = PushyClient::ZmqContext.socket(ZMQ::SUB)
  @server_heartbeat_socket.connect(@server_heartbeat_address)
  @server_heartbeat_socket.setsockopt(ZMQ::SUBSCRIBE, "")
  @server_heartbeat_seq_no = -1

  # Command socket
  Chef::Log.info "[#{node_name}] Connecting to command channel at #{@command_address}"
  # TODO
  # This needs to be set up to be able to handle bidirectional messages; right now this is Tx only
  # Probably need to set it up with a handler, like the subscriber socket above.
  @command_socket = PushyClient::ZmqContext.socket(ZMQ::DEALER)
  @command_socket.setsockopt(ZMQ::LINGER, 0)
  # Note setting this to '1' causes the client to crash on send, but perhaps that
  # beats storming the server when the server restarts
  @command_socket.setsockopt(ZMQ::RCVHWM, 0)
  # Buffering more than a few heartbeats can cause trauma on the server after restart
  @command_socket.setsockopt(ZMQ::SNDHWM, 3)

  if client.using_curve
    @command_socket.setsockopt(ZMQ::CURVE_SERVERKEY, server_curve_pub_key)
    @command_socket.setsockopt(ZMQ::CURVE_PUBLICKEY, client.client_curve_pub_key)
    @command_socket.setsockopt(ZMQ::CURVE_SECRETKEY, client.client_curve_sec_key)
  end

  @command_socket.connect(@command_address)
  @command_socket_server_seq_no = -1

  @command_socket_outgoing_seq = 0

  @receive_thread = start_receive_thread
end

#stopObject



178
179
180
181
182
183
184
# File 'lib/pushy_client/protocol_handler.rb', line 178

def stop
  @socket_lock.synchronize do
    @receive_socket_lock.synchronize do
      internal_stop
    end
  end
end