Class: ClientWorldConnection
- Inherits:
-
Object
- Object
- ClientWorldConnection
- Defined in:
- lib/libowl/client_world_connection.rb
Overview
A connection between a client and a world model. This class spawns a thread to handle incoming messages and returns instances of the Response and StepResponse classes to fulfill client requests. If a thread cannot be used then an instance of the class ClientWorldModel should be used instead.
Instance Attribute Summary collapse
-
#connected ⇒ Object
readonly
Indicates if this object is successfully connected to a world model.
Instance Method Summary collapse
-
#cancelRequest(ticket_number) ⇒ Object
Cancel a request with the given ticket number.
-
#close ⇒ Object
Close this connection.
-
#decodeAttributeAlias(inbuff) ⇒ Object
Decode attribute alias message.
-
#decodeDataResponse(inbuff) ⇒ Object
Decode a data response message, returning an array of WMData.
-
#decodeOriginAlias(inbuff) ⇒ Object
Decode origin alias message.
-
#decodeTicketMessage(inbuff) ⇒ Object
Decode a ticket message or a request complete message.
-
#decodeURIResponse(inbuff) ⇒ Object
Decode a URI response message, returning an array of WMData.
-
#getError(key) ⇒ Object
Get error (will return std::exception(“No error”) is there is none.
-
#getNext(key) ⇒ Object
Get the data from the response object corresponding to this key.
-
#handleMessage ⇒ Object
Handle a message of currently unknown type.
-
#hasError(key) ⇒ Object
Check if a request has an error.
-
#hasNext(key) ⇒ Object
getNext should only be called if hasNext is true, otherwise the future will be given an exception since there is no data.
-
#initialize(host, port) ⇒ ClientWorldConnection
constructor
Creates a new connection and spawns a thread to call handleMessage automatically.
-
#isComplete(key) ⇒ Object
See if a request is still being serviced (only for StepResponse - regular requests can’t be cancelled since they only have a single response).
-
#rangeRequest(name_pattern, attribute_patterns, start_time, stop_time) ⇒ Object
Issue a range request, returning a Response object for the request.
-
#setOriginPreference(origin_weights) ⇒ Object
Set a preference in the world model for certain origins.
-
#snapshotRequest(name_pattern, attribute_patterns, start_time = 0, stop_time = 0) ⇒ Object
Issue a snapshot request, returning a Response object for the request.
-
#streamRequest(name_pattern, attribute_patterns, update_interval) ⇒ Object
Issue a stream request, returning a StepResponse object for the request.
-
#URISearch(name_pattern) ⇒ Object
Search for any objects in the world model matching the given POSIX REGEX pattern.
Constructor Details
#initialize(host, port) ⇒ ClientWorldConnection
Creates a new connection and spawns a thread to call handleMessage automatically.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/libowl/client_world_connection.rb', line 61 def initialize(host, port) @promise_mutex = Mutex.new @cur_key = 0 @uri_search_keys = [] @single_response = {} @connected = false @host = host @port = port @socket = TCPSocket.open(host, port) handshake = "" ver_string = "GRAIL client protocol" #The handshake is the length of the message, the protocol string, and the version (0). handshake << [ver_string.length].pack('N') << ver_string << "\x00\x00" #Send a handshake and then receive one @socket.send(handshake, 0) inshake = @socket.recvfrom(handshake.length)[0] while (inshake.length < handshake.length) #puts "Waiting for #{handshake.length - inshake.length} byte more of handshake." inshake += @socket.recvfrom(handshake.length - inshake.length)[0] end @connected = true for i in 1..handshake.length if handshake[i] != inshake[i] puts "Handshake failure!" puts "For byte i we sent #{handshake[i]} but got #{inshake[i]}" @connected = false end end @alias_to_attr_name = {} @alias_to_origin_name = {} @next_data = {} @request_errors = {} #Start the listening thread @listen_thread = Thread.new do while (@connected) handleMessage() end end end |
Instance Attribute Details
#connected ⇒ Object (readonly)
Indicates if this object is successfully connected to a world model.
41 42 43 |
# File 'lib/libowl/client_world_connection.rb', line 41 def connected @connected end |
Instance Method Details
#cancelRequest(ticket_number) ⇒ Object
Cancel a request with the given ticket number
426 427 428 429 430 431 432 |
# File 'lib/libowl/client_world_connection.rb', line 426 def cancelRequest(ticket_number) buff = [CANCEL_REQUEST].pack('C') #Now append the ticket number as a 4 byte value buff += [ticket_number].pack('N') #Send the message with its length prepended to the front @socket.send("#{[buff.length].pack('N')}#{buff}", 0) end |
#close ⇒ Object
Close this connection
106 107 108 109 |
# File 'lib/libowl/client_world_connection.rb', line 106 def close() @socket.close() @connected = false end |
#decodeAttributeAlias(inbuff) ⇒ Object
Decode attribute alias message
240 241 242 243 244 245 246 247 248 249 |
# File 'lib/libowl/client_world_connection.rb', line 240 def decodeAttributeAlias(inbuff) num_aliases = inbuff.unpack('N')[0] rest = inbuff[4, inbuff.length - 1] for i in 1..num_aliases do attr_alias = rest.unpack('N')[0] name, rest = splitURIFromRest(rest[4, rest.length - 1]) #Assign this name to the given alias @alias_to_attr_name[attr_alias] = name end end |
#decodeDataResponse(inbuff) ⇒ Object
Decode a data response message, returning an array of WMData
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
# File 'lib/libowl/client_world_connection.rb', line 285 def decodeDataResponse(inbuff) attributes = [] object_uri, rest = splitURIFromRest(inbuff) ticket = rest.unpack('N')[0] total_attributes = rest[4, rest.length - 1].unpack('N')[0] rest = rest[8, rest.length] #puts "Decoding #{total_attributes} attributes" for i in 1..total_attributes do name_alias = rest.unpack('N')[0] creation_date = unpackuint64(rest[4, rest.length - 1]) expiration_date = unpackuint64(rest[12, rest.length - 1]) origin_alias = rest[20, rest.length - 1].unpack('N')[0] data_len = rest[24, rest.length - 1].unpack('N')[0] data = rest[28, data_len] rest = rest[28+data_len, rest.length - 1] attributes.push(WMAttribute.new(@alias_to_attr_name[name_alias], data, creation_date, expiration_date, @alias_to_origin_name[origin_alias])) end return WMData.new(object_uri, attributes, ticket) end |
#decodeOriginAlias(inbuff) ⇒ Object
Decode origin alias message
252 253 254 255 256 257 258 259 260 261 |
# File 'lib/libowl/client_world_connection.rb', line 252 def decodeOriginAlias(inbuff) num_aliases = inbuff.unpack('N')[0] rest = inbuff[4, inbuff.length - 1] for i in 1..num_aliases do origin_alias = rest.unpack('N')[0] name, rest = splitURIFromRest(rest[4, rest.length - 1]) #Assign this name to the given alias @alias_to_origin_name[origin_alias] = name end end |
#decodeTicketMessage(inbuff) ⇒ Object
Decode a ticket message or a request complete message.
265 266 267 |
# File 'lib/libowl/client_world_connection.rb', line 265 def decodeTicketMessage(inbuff) return inbuff.unpack('N')[0] end |
#decodeURIResponse(inbuff) ⇒ Object
Decode a URI response message, returning an array of WMData
271 272 273 274 275 276 277 278 279 280 281 |
# File 'lib/libowl/client_world_connection.rb', line 271 def decodeURIResponse(inbuff) uris = [] if (inbuff != nil) rest = inbuff while (rest.length > 4) name, rest = splitURIFromRest(rest) uris.push(name) end end return uris end |
#getError(key) ⇒ Object
Get error (will return std::exception(“No error”) is there is none
229 230 231 232 233 234 235 236 237 |
# File 'lib/libowl/client_world_connection.rb', line 229 def getError(key) if (not hasError(key)) raise "no error but getError called" else @promise_mutex.synchronize do return @request_errors[key] end end end |
#getNext(key) ⇒ Object
Get the data from the response object corresponding to this key. The response and stepResponse classes can call this directly so there usually won’t be a need for a developer to call this function directly.
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/libowl/client_world_connection.rb', line 199 def getNext(key) if (not hasNext(key)) raise "No next value in request" else data = {} @promise_mutex.synchronize do data = @next_data[key].shift end #If there is no more data in this request delete its associatd data if (isComplete(key)) @request_errors.delete key @next_data.delete key end return data end end |
#handleMessage ⇒ Object
Handle a message of currently unknown type. This is automatically handled by a thread spawned at object creation.
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/libowl/client_world_connection.rb', line 114 def handleMessage() #puts "Handling message..." #Get the message length as n unsigned integer inlen = (@socket.recvfrom(4)[0]).unpack('N')[0] inbuff = @socket.recvfrom(inlen)[0] #Keep reading until the entire packet is read #TODO This can block forever if a communication error occurs. while (inbuff.length < inlen) inbuff += @socket.recvfrom(inlen-inbuff.length)[0] end #Byte that indicates message type control = inbuff.unpack('C')[0] if control == ATTRIBUTE_ALIAS decodeAttributeAlias(inbuff[1, inbuff.length - 1]) elsif control == ORIGIN_ALIAS decodeOriginAlias(inbuff[1, inbuff.length - 1]) elsif control == REQUEST_COMPLETE ticket = decodeTicketMessage(inbuff[1, inbuff.length-1]) #Mark the corresponding request as complete by appending a nil value @promise_mutex.synchronize do if (@next_data.has_key? ticket) #Need an empty has in case a step response is waiting for a value @next_data[ticket].push(nil) end end elsif control == DATA_RESPONSE data = decodeDataResponse(inbuff[1, inbuff.length - 1]) #If the request was cancelled then don't try to push any more data @promise_mutex.synchronize do if (@next_data.has_key? data.ticket) @next_data[data.ticket][-1].store(data.uri, data.attributes) #Add a new entry for the next value if (not @single_response[data.ticket]) @next_data[data.ticket].push({}) end end end elsif control == URI_RESPONSE uris = decodeURIResponse(inbuff[1, inbuff.length - 1]) @promise_mutex.synchronize do uri_ticket = @uri_search_keys.shift puts "Finishing uri response for ticket #{uri_ticket}" #Make world model entries with no attributes for each URI uris.each{|uri| @next_data[uri_ticket][-1].store(uri, [])} #This request is complete now so push a nil value to finish it @next_data[uri_ticket].push(nil) end end #puts "processed message with id #{control}" return control end |
#hasError(key) ⇒ Object
Check if a request has an error. The response and stepResponse classes can call this directly so there usually won’t be a need for a developer to call this function directly.
221 222 223 224 225 |
# File 'lib/libowl/client_world_connection.rb', line 221 def hasError(key) @promise_mutex.synchronize do return (@request_errors.has_key? key) end end |
#hasNext(key) ⇒ Object
getNext should only be called if hasNext is true, otherwise the future will be given an exception since there is no data. The response and stepResponse classes can call this directly so there usually won’t be a need for a developer to call this function directly.
188 189 190 191 192 |
# File 'lib/libowl/client_world_connection.rb', line 188 def hasNext(key) @promise_mutex.synchronize do return ((@next_data.has_key? key) and (@next_data[key].length > 1)) end end |
#isComplete(key) ⇒ Object
See if a request is still being serviced (only for StepResponse - regular requests can’t be cancelled since they only have a single response).
170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/libowl/client_world_connection.rb', line 170 def isComplete(key) @promise_mutex.synchronize do if ((not @next_data.has_key?(key))) return true elsif (@next_data[key].empty?) return false else return (nil == @next_data[key][-1]) end end end |
#rangeRequest(name_pattern, attribute_patterns, start_time, stop_time) ⇒ Object
Issue a range request, returning a Response object for the request.
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 |
# File 'lib/libowl/client_world_connection.rb', line 335 def rangeRequest(name_pattern, attribute_patterns, start_time, stop_time) #Set up a ticket and mark this request as active by adding it to next_data ticket = 0 @promise_mutex.synchronize do ticket = @cur_key @cur_key += 1 @single_response.store(ticket, false) @next_data[ticket] = [{}] end buff = [RANGE_REQUEST].pack('C') buff += [ticket].pack('N') buff += strToSizedUTF16(name_pattern) buff += [attribute_patterns.length].pack('N') attribute_patterns.each{|pattern| buff += strToSizedUTF16(pattern) } buff += packuint64(start_time) buff += packuint64(stop_time) #Send the message with its length prepended to the front @socket.send("#{[buff.length].pack('N')}#{buff}", 0) return StepResponse.new(self, ticket) end |
#setOriginPreference(origin_weights) ⇒ Object
Set a preference in the world model for certain origins.
413 414 415 416 417 418 419 420 421 422 423 |
# File 'lib/libowl/client_world_connection.rb', line 413 def setOriginPreference(origin_weights) buff = [ORIGIN_PREFERENCE].pack('C') #Each origin weight should be a pair of a name and a value origin_weights.each{|ow| #It's okay to pack using N since this operates the same #for signed and unsigned values. buff += strToSizedUTF16(ow[0]) + [ow[1]].pack('N') } #Send the message with its length prepended to the front @socket.send("#{[buff.length].pack('N')}#{buff}", 0) end |
#snapshotRequest(name_pattern, attribute_patterns, start_time = 0, stop_time = 0) ⇒ Object
Issue a snapshot request, returning a Response object for the request.
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 |
# File 'lib/libowl/client_world_connection.rb', line 307 def snapshotRequest(name_pattern, attribute_patterns, start_time = 0, stop_time = 0) #Set up a ticket and mark this request as active by adding it to next_data ticket = 0 @promise_mutex.synchronize do ticket = @cur_key @cur_key += 1 @single_response.store(ticket, true) @next_data[ticket] = [{}] end buff = [SNAPSHOT_REQUEST].pack('C') buff += [ticket].pack('N') buff += strToSizedUTF16(name_pattern) buff += [attribute_patterns.length].pack('N') attribute_patterns.each{|pattern| buff += strToSizedUTF16(pattern) } buff += packuint64(start_time) buff += packuint64(stop_time) #Send the message with its length prepended to the front @socket.send("#{[buff.length].pack('N')}#{buff}", 0) return Response.new(self, ticket) end |
#streamRequest(name_pattern, attribute_patterns, update_interval) ⇒ Object
Issue a stream request, returning a StepResponse object for the request.
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 |
# File 'lib/libowl/client_world_connection.rb', line 364 def streamRequest(name_pattern, attribute_patterns, update_interval) #Set up a ticket and mark this request as active by adding it to next_data ticket = 0 @promise_mutex.synchronize do ticket = @cur_key @cur_key += 1 @single_response.store(ticket, false) @next_data[ticket] = [{}] end buff = [STREAM_REQUEST].pack('C') buff += [ticket].pack('N') buff += strToSizedUTF16(name_pattern) buff += [attribute_patterns.length].pack('N') attribute_patterns.each{|pattern| buff += strToSizedUTF16(pattern) } buff += packuint64(0) buff += packuint64(update_interval) #Send the message with its length prepended to the front @socket.send("#{[buff.length].pack('N')}#{buff}", 0) return StepResponse.new(self, ticket) end |
#URISearch(name_pattern) ⇒ Object
Search for any objects in the world model matching the given POSIX REGEX pattern.
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 |
# File 'lib/libowl/client_world_connection.rb', line 394 def URISearch(name_pattern) #Set up a ticket and mark this request as active by adding it to next_data ticket = 0 @promise_mutex.synchronize do ticket = @cur_key @cur_key += 1 @single_response.store(ticket, true) @next_data[ticket] = [{}] @uri_search_keys.push(ticket) end buff = [URI_SEARCH].pack('C') buff += strToUnicode(name_pattern) #Send the message with its length prepended to the front @socket.send("#{[buff.length].pack('N')}#{buff}", 0) return Response.new(self, ticket) end |