Class: ClientWorldConnection

Inherits:
Object
  • Object
show all
Defined in:
lib/libowl/client_world_connection.rb

Overview

A connection between a client and a world model. This class spawns a thread to handle incoming messages and returns instances of the Response and StepResponse classes to fulfill client requests. If a thread cannot be used then an instance of the class ClientWorldModel should be used instead.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port) ⇒ ClientWorldConnection

Creates a new connection and spawns a thread to call handleMessage automatically.



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/libowl/client_world_connection.rb', line 61

def initialize(host, port)
  @promise_mutex = Mutex.new
  @cur_key = 0
  @uri_search_keys = []
  @single_response = {}
  @connected = false
  @host = host
  @port = port
  @socket = TCPSocket.open(host, port)
  handshake = ""
  ver_string = "GRAIL client protocol"
  #The handshake is the length of the message, the protocol string, and the version (0).
  handshake << [ver_string.length].pack('N') << ver_string << "\x00\x00"
  #Send a handshake and then receive one
  @socket.send(handshake, 0)
  inshake = @socket.recvfrom(handshake.length)[0]
  while (inshake.length < handshake.length)
    #puts "Waiting for #{handshake.length - inshake.length} byte more of handshake."
    inshake += @socket.recvfrom(handshake.length - inshake.length)[0]
  end

  @connected = true
  for i in 1..handshake.length
    if handshake[i] != inshake[i]
      puts "Handshake failure!"
      puts "For byte i we sent #{handshake[i]} but got #{inshake[i]}"
      @connected = false
    end
  end

  @alias_to_attr_name = {}
  @alias_to_origin_name = {}
  @next_data = {}
  @request_errors = {}

  #Start the listening thread
  @listen_thread = Thread.new do
    while (@connected)
      handleMessage()
    end
  end
end

Instance Attribute Details

#connectedObject (readonly)

Indicates if this object is successfully connected to a world model.



41
42
43
# File 'lib/libowl/client_world_connection.rb', line 41

def connected
  @connected
end

Instance Method Details

#cancelRequest(ticket_number) ⇒ Object

Cancel a request with the given ticket number



426
427
428
429
430
431
432
# File 'lib/libowl/client_world_connection.rb', line 426

def cancelRequest(ticket_number)
  buff = [CANCEL_REQUEST].pack('C')
  #Now append the ticket number as a 4 byte value
  buff += [ticket_number].pack('N')
  #Send the message with its length prepended to the front
  @socket.send("#{[buff.length].pack('N')}#{buff}", 0)
end

#closeObject

Close this connection



106
107
108
109
# File 'lib/libowl/client_world_connection.rb', line 106

def close()
  @socket.close()
  @connected = false
end

#decodeAttributeAlias(inbuff) ⇒ Object

Decode attribute alias message



240
241
242
243
244
245
246
247
248
249
# File 'lib/libowl/client_world_connection.rb', line 240

def decodeAttributeAlias(inbuff)
  num_aliases = inbuff.unpack('N')[0]
  rest = inbuff[4, inbuff.length - 1]
  for i in 1..num_aliases do
    attr_alias = rest.unpack('N')[0]
    name, rest = splitURIFromRest(rest[4, rest.length - 1])
    #Assign this name to the given alias
    @alias_to_attr_name[attr_alias] = name
  end
end

#decodeDataResponse(inbuff) ⇒ Object

Decode a data response message, returning an array of WMData



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# File 'lib/libowl/client_world_connection.rb', line 285

def decodeDataResponse(inbuff)
  attributes = []
  object_uri, rest = splitURIFromRest(inbuff)
  ticket = rest.unpack('N')[0]
  total_attributes = rest[4, rest.length - 1].unpack('N')[0]
  rest = rest[8, rest.length]
  #puts "Decoding #{total_attributes} attributes"
  for i in 1..total_attributes do
    name_alias = rest.unpack('N')[0]
    creation_date = unpackuint64(rest[4, rest.length - 1])
    expiration_date = unpackuint64(rest[12, rest.length - 1])
    origin_alias = rest[20, rest.length - 1].unpack('N')[0]
    data_len = rest[24, rest.length - 1].unpack('N')[0]
    data = rest[28, data_len]
    rest = rest[28+data_len, rest.length - 1]
    attributes.push(WMAttribute.new(@alias_to_attr_name[name_alias], data, creation_date, expiration_date, @alias_to_origin_name[origin_alias]))
  end
  return WMData.new(object_uri, attributes, ticket)
end

#decodeOriginAlias(inbuff) ⇒ Object

Decode origin alias message



252
253
254
255
256
257
258
259
260
261
# File 'lib/libowl/client_world_connection.rb', line 252

def decodeOriginAlias(inbuff)
  num_aliases = inbuff.unpack('N')[0]
  rest = inbuff[4, inbuff.length - 1]
  for i in 1..num_aliases do
    origin_alias = rest.unpack('N')[0]
    name, rest = splitURIFromRest(rest[4, rest.length - 1])
    #Assign this name to the given alias
    @alias_to_origin_name[origin_alias] = name
  end
end

#decodeTicketMessage(inbuff) ⇒ Object

Decode a ticket message or a request complete message.



265
266
267
# File 'lib/libowl/client_world_connection.rb', line 265

def decodeTicketMessage(inbuff)
  return inbuff.unpack('N')[0]
end

#decodeURIResponse(inbuff) ⇒ Object

Decode a URI response message, returning an array of WMData



271
272
273
274
275
276
277
278
279
280
281
# File 'lib/libowl/client_world_connection.rb', line 271

def decodeURIResponse(inbuff)
  uris = []
  if (inbuff != nil)
    rest = inbuff
    while (rest.length > 4)
      name, rest = splitURIFromRest(rest)
      uris.push(name)
    end
  end
  return uris
end

#getError(key) ⇒ Object

Get error (will return std::exception(“No error”) is there is none



229
230
231
232
233
234
235
236
237
# File 'lib/libowl/client_world_connection.rb', line 229

def getError(key)
  if (not hasError(key))
    raise "no error but getError called"
  else
    @promise_mutex.synchronize do
      return @request_errors[key]
    end
  end
end

#getNext(key) ⇒ Object

Get the data from the response object corresponding to this key. The response and stepResponse classes can call this directly so there usually won’t be a need for a developer to call this function directly.



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/libowl/client_world_connection.rb', line 199

def getNext(key)
  if (not hasNext(key))
    raise "No next value in request"
  else
    data = {}
    @promise_mutex.synchronize do
      data = @next_data[key].shift
    end
    #If there is no more data in this request delete its associatd data
    if (isComplete(key))
      @request_errors.delete key
      @next_data.delete key
    end
    return data
  end
end

#handleMessageObject

Handle a message of currently unknown type. This is automatically handled by a thread spawned at object creation.



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/libowl/client_world_connection.rb', line 114

def handleMessage()
  #puts "Handling message..."
  #Get the message length as n unsigned integer
  inlen = (@socket.recvfrom(4)[0]).unpack('N')[0]
  inbuff = @socket.recvfrom(inlen)[0]
  #Keep reading until the entire packet is read
  #TODO This can block forever if a communication error occurs.
  while (inbuff.length < inlen) 
    inbuff += @socket.recvfrom(inlen-inbuff.length)[0]
  end
  #Byte that indicates message type
  control = inbuff.unpack('C')[0]
  if control == ATTRIBUTE_ALIAS
    decodeAttributeAlias(inbuff[1, inbuff.length - 1])
  elsif control == ORIGIN_ALIAS
    decodeOriginAlias(inbuff[1, inbuff.length - 1])
  elsif control == REQUEST_COMPLETE
    ticket = decodeTicketMessage(inbuff[1, inbuff.length-1])
    #Mark the corresponding request as complete by appending a nil value
    @promise_mutex.synchronize do
      if (@next_data.has_key? ticket)
        #Need an empty has in case a step response is waiting for a value
        @next_data[ticket].push(nil)
      end
    end
  elsif control == DATA_RESPONSE
    data = decodeDataResponse(inbuff[1, inbuff.length - 1])
    #If the request was cancelled then don't try to push any more data
    @promise_mutex.synchronize do
      if (@next_data.has_key? data.ticket)
        @next_data[data.ticket][-1].store(data.uri, data.attributes)
        #Add a new entry for the next value
        if (not @single_response[data.ticket])
          @next_data[data.ticket].push({})
        end
      end
    end
  elsif control == URI_RESPONSE
    uris = decodeURIResponse(inbuff[1, inbuff.length - 1])
    @promise_mutex.synchronize do
      uri_ticket = @uri_search_keys.shift
      puts "Finishing uri response for ticket #{uri_ticket}"
      #Make world model entries with no attributes for each URI
      uris.each{|uri| @next_data[uri_ticket][-1].store(uri, [])}
      #This request is complete now so push a nil value to finish it
      @next_data[uri_ticket].push(nil)
    end
  end
  #puts "processed message with id #{control}"
  return control
end

#hasError(key) ⇒ Object

Check if a request has an error. The response and stepResponse classes can call this directly so there usually won’t be a need for a developer to call this function directly.



221
222
223
224
225
# File 'lib/libowl/client_world_connection.rb', line 221

def hasError(key)
  @promise_mutex.synchronize do
    return (@request_errors.has_key? key)
  end
end

#hasNext(key) ⇒ Object

getNext should only be called if hasNext is true, otherwise the future will be given an exception since there is no data. The response and stepResponse classes can call this directly so there usually won’t be a need for a developer to call this function directly.



188
189
190
191
192
# File 'lib/libowl/client_world_connection.rb', line 188

def hasNext(key)
  @promise_mutex.synchronize do
    return ((@next_data.has_key? key) and (@next_data[key].length > 1))
  end
end

#isComplete(key) ⇒ Object

See if a request is still being serviced (only for StepResponse - regular requests can’t be cancelled since they only have a single response).



170
171
172
173
174
175
176
177
178
179
180
# File 'lib/libowl/client_world_connection.rb', line 170

def isComplete(key)
  @promise_mutex.synchronize do
    if ((not @next_data.has_key?(key)))
      return true
    elsif (@next_data[key].empty?)
      return false
    else
      return (nil == @next_data[key][-1])
    end
  end
end

#rangeRequest(name_pattern, attribute_patterns, start_time, stop_time) ⇒ Object

Issue a range request, returning a Response object for the request.



335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
# File 'lib/libowl/client_world_connection.rb', line 335

def rangeRequest(name_pattern, attribute_patterns, start_time, stop_time)
  #Set up a ticket and mark this request as active by adding it to next_data
  ticket = 0
  @promise_mutex.synchronize do
    ticket = @cur_key
    @cur_key += 1
    @single_response.store(ticket, false)
    @next_data[ticket] = [{}]
  end
  buff = [RANGE_REQUEST].pack('C')

  buff += [ticket].pack('N')
  buff += strToSizedUTF16(name_pattern)
  buff += [attribute_patterns.length].pack('N')

  attribute_patterns.each{|pattern|
    buff += strToSizedUTF16(pattern)
  }

  buff += packuint64(start_time)
  buff += packuint64(stop_time)

  #Send the message with its length prepended to the front
  @socket.send("#{[buff.length].pack('N')}#{buff}", 0)
  return StepResponse.new(self, ticket)
end

#setOriginPreference(origin_weights) ⇒ Object

Set a preference in the world model for certain origins.



413
414
415
416
417
418
419
420
421
422
423
# File 'lib/libowl/client_world_connection.rb', line 413

def setOriginPreference(origin_weights)
  buff = [ORIGIN_PREFERENCE].pack('C')
  #Each origin weight should be a pair of a name and a value
  origin_weights.each{|ow|
    #It's okay to pack using N since this operates the same
    #for signed and unsigned values.
    buff += strToSizedUTF16(ow[0]) + [ow[1]].pack('N')
  }
  #Send the message with its length prepended to the front
  @socket.send("#{[buff.length].pack('N')}#{buff}", 0)
end

#snapshotRequest(name_pattern, attribute_patterns, start_time = 0, stop_time = 0) ⇒ Object

Issue a snapshot request, returning a Response object for the request.



307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/libowl/client_world_connection.rb', line 307

def snapshotRequest(name_pattern, attribute_patterns, start_time = 0, stop_time = 0)
  #Set up a ticket and mark this request as active by adding it to next_data
  ticket = 0
  @promise_mutex.synchronize do
    ticket = @cur_key
    @cur_key += 1
    @single_response.store(ticket, true)
    @next_data[ticket] = [{}]
  end
  buff = [SNAPSHOT_REQUEST].pack('C')

  buff += [ticket].pack('N')
  buff += strToSizedUTF16(name_pattern)
  buff += [attribute_patterns.length].pack('N')

  attribute_patterns.each{|pattern|
    buff += strToSizedUTF16(pattern)
  }

  buff += packuint64(start_time)
  buff += packuint64(stop_time)
  #Send the message with its length prepended to the front
  @socket.send("#{[buff.length].pack('N')}#{buff}", 0)
  return Response.new(self, ticket)
end

#streamRequest(name_pattern, attribute_patterns, update_interval) ⇒ Object

Issue a stream request, returning a StepResponse object for the request.



364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
# File 'lib/libowl/client_world_connection.rb', line 364

def streamRequest(name_pattern, attribute_patterns, update_interval)
  #Set up a ticket and mark this request as active by adding it to next_data
  ticket = 0
  @promise_mutex.synchronize do
    ticket = @cur_key
    @cur_key += 1
    @single_response.store(ticket, false)
    @next_data[ticket] = [{}]
  end
  buff = [STREAM_REQUEST].pack('C')

  buff += [ticket].pack('N')
  buff += strToSizedUTF16(name_pattern)
  buff += [attribute_patterns.length].pack('N')

  attribute_patterns.each{|pattern|
    buff += strToSizedUTF16(pattern)
  }

  buff += packuint64(0)
  buff += packuint64(update_interval)

  #Send the message with its length prepended to the front
  @socket.send("#{[buff.length].pack('N')}#{buff}", 0)
  return StepResponse.new(self, ticket)
end

#URISearch(name_pattern) ⇒ Object

Search for any objects in the world model matching the given POSIX REGEX pattern.



394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
# File 'lib/libowl/client_world_connection.rb', line 394

def URISearch(name_pattern)
  #Set up a ticket and mark this request as active by adding it to next_data
  ticket = 0
  @promise_mutex.synchronize do
    ticket = @cur_key
    @cur_key += 1
    @single_response.store(ticket, true)
    @next_data[ticket] = [{}]
    @uri_search_keys.push(ticket)
  end
  buff = [URI_SEARCH].pack('C')
  buff += strToUnicode(name_pattern)
  #Send the message with its length prepended to the front
  @socket.send("#{[buff.length].pack('N')}#{buff}", 0)
  return Response.new(self, ticket)
end