Class: PDTP::Protocol

Inherits:
LengthPrefixProtocol show all
Defined in:
lib/pdtp/common/protocol.rb

Overview

EventMachine handler class for the PDTP protocol

Direct Known Subclasses

Client::Connection, Server::Connection

Defined Under Namespace

Classes: Optional

Constant Summary collapse

@@num_connections =
0
@@message_params =
nil

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from LengthPrefixProtocol

#prefix_size=, #receive_data, #send_packet

Constructor Details

#initialize(*args) ⇒ Protocol

Returns a new instance of Protocol.



54
55
56
57
# File 'lib/pdtp/common/protocol.rb', line 54

def initialize(*args)
  user_data = nil
  super
end

Instance Attribute Details

#user_dataObject

users of this class may store arbitrary data here



75
76
77
# File 'lib/pdtp/common/protocol.rb', line 75

def user_data
  @user_data
end

Class Method Details

.define_message_paramsObject

this function defines the required fields for each message



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# File 'lib/pdtp/common/protocol.rb', line 245

def self.define_message_params
  mp = {}

  #must be the first message the client sends
  mp["register"]={
    "client_id"=>:string,
    "listen_port"=>:int                  
  }

  mp["ask_info"]={
    "url"=>:url
  }

  mp["tell_info"]={
    "url"=>:url,
    "size"=>Optional.new(:int),
    "chunk_size"=>Optional.new(:int),
    "streaming"=>Optional.new(:bool)
  }

  mp["ask_verify"]={
    "peer"=>:ip,
    "url"=>:url,
    "range"=>:range,
    "peer_id"=>:string
  }

  mp["tell_verify"]={
    "peer"=>:ip,
    "url"=>:url,
    "range"=>:range,
    "peer_id"=>:string,
    "authorized"=>:bool
  }

  mp["request"]={
    "url"=>:url,
    "range"=>Optional.new(:range)
  }

  mp["provide"]={
    "url"=>:url,
    "range"=>Optional.new(:range)
  }

  mp["unrequest"]={
    "url"=>:url,
    "range"=>Optional.new(:range)
  }

  mp["unprovide"]={
    "url"=>:url,
    "range"=>Optional.new(:range)
  }

  #the taker sends this message when a transfer finishes
  #if there is an error in the transfer, dont set a hash
  #to signify failure
  #when this is received from the taker, the connection is considered done for all parties
  #
  #The giver also sends this message when they are done transferring.
  #this closes the connection on their side, allowing them to start other transfers
  #It leaves the connection open on the taker side to allow them to decide if the transfer was successful
  #the hash parameter is ignored when sent by the giver
  mp["completed"]={
    #"peer"=>:ip, no longer used
    "url"=>:url,
    "range"=>:range,
    "peer_id"=>:string,
    "hash"=>Optional.new(:string)
  }

  mp["hash_verify"]={
    "url"=>:url,
    "range"=>:range,
    "hash_ok"=>:bool
  }

  mp["transfer"]={
    "host"=>:string,
    "port"=>:int,
    "method"=>:string,
    "url"=>:url,
    "range"=>:range,
    "peer_id"=>:string
  }  

  mp["protocol_error"]={
    "message"=>Optional.new(:string)
  }

  mp["protocol_warn"]={
    "message"=>Optional.new(:string)
  }

  mp
end

.obj_matches_type?(obj, type) ⇒ Boolean

returns whether or not a given ruby object matches the specified type available types: :url, :range, :ip, :int, :bool, :string

Returns:

  • (Boolean)


229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/pdtp/common/protocol.rb', line 229

def self.obj_matches_type?(obj,type)
  case type
  when :url then obj.class == String
  when :range then obj.class == Range or obj.class == Hash
  when :int then obj.class == Fixnum
  when :bool then obj == true or obj == false
  when :string then obj.class == String
  when :ip
    ip = IPAddr.new(obj) rescue nil
    !ip.nil?
  else 
    raise "Invalid type specified: #{type}"
  end 
end


180
181
182
# File 'lib/pdtp/common/protocol.rb', line 180

def self.print_info
  puts "num_connections=#{@@num_connections}"
end

.validate_message(message) ⇒ Object

makes sure that the message is valid. if not, throws a ProtocolError

Raises:



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/pdtp/common/protocol.rb', line 196

def self.validate_message(message)
  raise ProtocolError.new("Message is not a JSON array") unless message.is_a? Array
  command, options = message
  
  @@message_params ||= define_message_params

  params = @@message_params[command] rescue nil
  raise ProtocolError.new("Invalid message type: #{command}") if params.nil?

  params.each do |name,type|
    if type.class == Optional
      next if options[name].nil? #dont worry about it if they dont have this param
      type = type.type #grab the real type from within the optional class
    end

    raise ProtocolError.new("required parameter: '#{name}' missing for message: '#{command}'") if options[name].nil?
    unless obj_matches_type?(options[name], type)
      raise ProtocolError.new("parameter: '#{name}' val='#{options[name]}' is not of type: '#{type}' for message: '#{command}' ")
    end
  end    
end

Instance Method Details

#connection_open?Boolean

Returns:

  • (Boolean)


50
51
52
# File 'lib/pdtp/common/protocol.rb', line 50

def connection_open?
  @connection_open
end

#error_close_connection(error) ⇒ Object

close a connection, but first send the specified error message



78
79
80
81
82
83
84
85
# File 'lib/pdtp/common/protocol.rb', line 78

def error_close_connection(error) 
  if PROTOCOL_DEBUG
    send_message :protocol_error, :message => error
    close_connection true # close after writing
  else
    close_connection
  end
end

#get_peer_infoObject

returns the ip address and port in an array [ip, port]



185
186
187
# File 'lib/pdtp/common/protocol.rb', line 185

def get_peer_info
  @cached_peer_info
end

#hash_to_range(command, message) ⇒ Object

converts a PDTP protocol min and max hash to a Ruby Range class



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/pdtp/common/protocol.rb', line 140

def hash_to_range(command, message)
  key="range"
  auto_types=["provide","request"] #these types assume a range if it isnt specified
  auto_types.each do |type|
    if command == type and message[key].nil?
      message[key]={} # assume entire file if not specified
    end
  end

  if message[key]
    raise if message[key].class!=Hash
    min=message[key]["min"] 
    max=message[key]["max"]
    message[key]= (min ? min : 0)..(max ? max : -1)
  end
end

#post_initObject

called by EventMachine after a connection has been established



60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/pdtp/common/protocol.rb', line 60

def post_init
  # a cache of the peer info because eventmachine seems to drop it before we want
  peername = get_peername
  if peername.nil?
    @cached_peer_info = ["<Peername nil!!!>", 91119] if peername.nil?
  else
    port, addr = Socket.unpack_sockaddr_in(peername)
    @cached_peer_info = [addr.to_s, port.to_i]
  end

  @@num_connections += 1
  @connection_open = true
  connection_created if respond_to? :connection_created
end

#range_to_hash(message) ⇒ Object

converts Ruby Range classes in the message to PDTP protocol hashes with min and max 0..-1 => nil (entire file) 10..-1 => “min”=>10 (contents of file >= 10)



125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/pdtp/common/protocol.rb', line 125

def range_to_hash(message)
  message.each do |key,value|
    if value.class==Range
      if value==(0..-1)
        message.delete(key)
      elsif value.last==-1 
        message[key]={"min"=>value.first}
      else
        message[key]={"min"=>value.first,"max"=>value.last}
      end
    end   
  end
end

#receive_packet(packet) ⇒ Object

called for each packet of data received over the wire parses the JSON message and dispatches the message



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/pdtp/common/protocol.rb', line 95

def receive_packet(packet)
  begin
    packet.chomp!
    #@@log.debug "(#{remote_peer_id}) recv: " + packet
    message = JSON.parse(packet) rescue nil
    raise ProtocolError.new("JSON couldn't parse: #{packet}") if message.nil?
    Protocol.validate_message message
    
    command, options = message
    hash_to_range command, options
    receive_message(command, options) if respond_to? :receive_message
  rescue ProtocolError => e
    # FIXME Should likely be raised and handled higher
    STDERR.write "(#{remote_peer_id}) PROTOCOL ERROR: #{e.to_s}\n"
    STDERR.write e.backtrace.join("\n") + "\n"
    error_close_connection e.to_s
  rescue ProtocolWarn => e
    send_message :protocol_warn, :message => e.to_s
  rescue Exception => e
    # FIXME Should likely be raised and handled higher
    STDERR.write "(#{remote_peer_id}) UNKNOWN EXCEPTION #{e.to_s}\n"
    STDERR.write e.backtrace.join("\n") + "\n"
  end
end

#remote_peer_idObject

debug routine: returns id of remote peer on this connection



88
89
90
91
# File 'lib/pdtp/common/protocol.rb', line 88

def remote_peer_id
  ret = user_data.client_id rescue nil
  ret || 'NOID'
end

#send_message(command, opts = {}) ⇒ Object

sends a message, in the internal Hash format, over the wire



158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/pdtp/common/protocol.rb', line 158

def send_message(command, opts = {})
  #message = opts.merge(:type => command.to_s)

  # Stringify all option keys
  opts = opts.map { |k,v| [k.to_s, v] }.inject({}) { |h,(k,v)| h[k] = v; h }
  
  # Convert all Ruby ranges to JSON objects representing them
  range_to_hash opts
  
  # Message format is a JSON array with the command (string) as the first entry
  # Second entry is an options hash/object
  message = [command.to_s, opts]
  send_packet JSON.unparse(message) + "\n"  
end

#to_sObject



189
190
191
192
# File 'lib/pdtp/common/protocol.rb', line 189

def to_s
  addr,port = get_peer_info
  "#{addr}:#{port}"
end

#unbindObject

called by EventMachine when a connection is closed



174
175
176
177
178
# File 'lib/pdtp/common/protocol.rb', line 174

def unbind
  @@num_connections -= 1
  connection_destroyed if respond_to? :connection_destroyed
  @connection_open = false
end