Class: Legs

Inherits:
Object
  • Object
show all
Defined in:
lib/legs.rb

Defined Under Namespace

Classes: RemoteError, RequestError, Result, StartBlockError

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host = 'localhost', port = 30274) ⇒ Legs

Legs.new for a client connected to some other legs server



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/legs.rb', line 11

def initialize(host = 'localhost', port = 30274)
  self.class.start(port) if self.class != Legs && !self.class.started?
  @parent = false; @responses = Hash.new; @meta = {}; @disconnected = false
  @responses_mutex = Mutex.new; @socket_mutex = Mutex.new
  
  if host.instance_of?(TCPSocket)
    @socket = host
    @parent = port unless port.instance_of?(Numeric)
  elsif host.instance_of?(String)
    @socket = TCPSocket.new(host, port)
    self.class.outgoing_mutex.synchronize { self.class.outgoing.push self }
  else
    raise "First argument needs to be a hostname, ip, or socket"
  end
  
  
  @handle_data = Proc.new do |data|
    data = json_restore(JSON.parse(data))
    
    if data['method']
      (@parent || self.class).__data!(data, self)
    elsif data['error'] and data['id'].nil?
      raise data['error']
    else
      @responses_mutex.synchronize { @responses[data['id']] = data }
    end
  end
  
  @thread = Thread.new do
    until @socket.closed?
      begin
        close! if @socket.eof?
        data = nil
        @socket_mutex.synchronize { data = @socket.gets(self.class.terminator) rescue nil }
        if data.nil?
          close!
        else
          @handle_data[data]
        end
      rescue JSON::ParserError => e
        send_data!({"error" => "JSON provided is invalid. See http://json.org/ to see how to format correctly."})
      rescue IOError => e
        close!
      end
    end
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missingObject

sends a normal RPC request that has a response catch all the rogue calls and make them work niftily



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/legs.rb', line 108

def send!(method, *args, &blk)
  puts "Call #{self.inspect}: #{method}(#{args.map(&:inspect).join(', ')})" if self.class.log?
  id = get_unique_number
  send_data! 'method' => method.to_s, 'params' => args, 'id' => id
  
  worker = Proc.new do
    sleep 0.1 until @responses_mutex.synchronize { @responses.keys.include?(id) }
    
    result = Legs::Result.new(@responses_mutex.synchronize { @responses.delete(id) })
    puts ">> #{method} #=> #{result.data['result'].inspect}" if self.class.log?
    result
  end
  
  if blk.respond_to?(:call); Thread.new { blk[worker.call] }
  else; worker.call.value; end
end

Class Attribute Details

.incomingObject (readonly) Also known as: users

Returns the value of attribute incoming.



204
205
206
# File 'lib/legs.rb', line 204

def incoming
  @incoming
end

.incoming_mutexObject (readonly)

Returns the value of attribute incoming_mutex.



204
205
206
# File 'lib/legs.rb', line 204

def incoming_mutex
  @incoming_mutex
end

.logObject Also known as: log?

Returns the value of attribute log.



203
204
205
# File 'lib/legs.rb', line 203

def log
  @log
end

.messages_mutexObject (readonly)

Returns the value of attribute messages_mutex.



204
205
206
# File 'lib/legs.rb', line 204

def messages_mutex
  @messages_mutex
end

.outgoingObject (readonly)

Returns the value of attribute outgoing.



204
205
206
# File 'lib/legs.rb', line 204

def outgoing
  @outgoing
end

.outgoing_mutexObject (readonly)

Returns the value of attribute outgoing_mutex.



204
205
206
# File 'lib/legs.rb', line 204

def outgoing_mutex
  @outgoing_mutex
end

.server_objectObject (readonly)

Returns the value of attribute server_object.



204
205
206
# File 'lib/legs.rb', line 204

def server_object
  @server_object
end

.terminatorObject

Returns the value of attribute terminator.



203
204
205
# File 'lib/legs.rb', line 203

def terminator
  @terminator
end

Instance Attribute Details

#metaObject (readonly)

general getters



7
8
9
# File 'lib/legs.rb', line 7

def meta
  @meta
end

#parentObject (readonly)

general getters



7
8
9
# File 'lib/legs.rb', line 7

def parent
  @parent
end

#socketObject (readonly)

general getters



7
8
9
# File 'lib/legs.rb', line 7

def socket
  @socket
end

Class Method Details

.__data!(data, from) ⇒ Object

gets called to handle all incoming messages (RPC requests)



350
351
352
# File 'lib/legs.rb', line 350

def __data!(data, from)
  @messages.enq [data, from]
end

.__make_symbol(name) ⇒ Object

lets the marshaler transport symbols



375
# File 'lib/legs.rb', line 375

def __make_symbol(name); name.to_sym; end

.add_block(name, &block) ⇒ Object

add’s a block to the ‘server’ class in a way that retains it’s old bindings. the block will be passed the caller object, followed by the args.



366
367
368
369
370
371
372
# File 'lib/legs.rb', line 366

def add_block(name, &block)
  @server_class.class_eval do
    define_method(name) do |*args|
      block.call caller, *args
    end
  end
end

.connectionsObject

returns an array of all connections



339
340
341
# File 'lib/legs.rb', line 339

def connections
  @incoming + @outgoing
end

.define_method(name, &blk) ⇒ Object

add’s a method to the ‘server’ class, bound in to that class



362
# File 'lib/legs.rb', line 362

def define_method(name, &blk); @server_class.class_eval { define_method(name, &blk) }; end

.event(name, sender, *extras) ⇒ Object

add an event call to the server’s message queue



344
345
346
347
# File 'lib/legs.rb', line 344

def event(name, sender, *extras)
  return unless @server_object.respond_to?("on_#{name}")
  __data!({'method' => "on_#{name}", 'params' => extras.to_a, 'id' => nil}, sender)
end

.finalize(incoming, state) ⇒ Object

implemented on class to work around enclosure stuff with object finalizer



333
334
335
336
# File 'lib/legs.rb', line 333

def self.finalize(incoming, state)
  state[:started] = false
  incoming.each { |user| user.close! }
end

.initializerObject



209
210
211
212
213
214
# File 'lib/legs.rb', line 209

def initializer
  @incoming = []; @outgoing = []; @messages = Queue.new; @terminator = "\n"; @log = false
  @incoming_mutex = Mutex.new; @outgoing_mutex = Mutex.new; @state = { :started => false}
  
  ObjectSpace.define_finalizer(self) { self.class.finalize(@incoming, @state) }
end

.open(*args) {|client| ... } ⇒ Object

People say this syntax is too funny not to have… whatever. Works like IO and File and what have you

Yields:

  • (client)


355
356
357
358
359
# File 'lib/legs.rb', line 355

def open(*args)
  client = Legs.new(*args)
  yield(client)
  client.close!
end

.start(port = 30274, &blk) ⇒ Object

starts the server, pass nil for port to make a ‘server’ that doesn’t actually accept connections This is useful for adding methods to Legs so that systems you connect to can call methods back on you



219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/legs.rb', line 219

def start(port=30274, &blk)
  return @server_class.module_eval(&blk) if started? and blk.respond_to? :call
  @state[:started] = true
  
  # makes a nice clean class to hold all the server methods.
  if @server_class.nil?
    @server_class = Class.new
    @server_class.module_eval do
      private
      attr_reader :server, :caller
      
      # sends a notification message to all connected clients
      def broadcast(*args)
        if args.first.is_a?(Array)
          list = args.shift
          method = args.shift
        elsif args.first.is_a?(String) or args.first.is_a?(Symbol)
          list = server.incoming
          method = args.shift
        else
          raise "You need to specify a 'method' to broadcast out to"
        end
        
        list.each { |user| user.notify!(method, *args) }
      end
      
      # Finds a user by the value of a certain property... like find_user_by :object_id, 12345
      def find_user_by_object_id value
        server.incoming.find { |user| user.object_id == value }
      end
      
      # finds user's with the specified meta keys matching the specified values, can use regexps and stuff, like a case block
      def find_users_by_meta hash = nil
        raise "You need to give find_users_by_meta a hash to check the user's meta hash against" if hash.nil?
        server.incoming.select do |user|
          hash.all? { |key, value| value === user.meta[key] }
        end
      end
      
      public # makes it public again for the user code
    end
  end
  
  @server_class.module_eval(&blk) if blk.respond_to?(:call)
  
  if @server_object.nil?
    @server_object = @server_class.allocate
    @server_object.instance_variable_set(:@server, self)
    @server_object.instance_eval { initialize }
  end

  @message_processor = Thread.new do
    while started?
      sleep 0.01 while @messages.empty?
      data, from = @messages.deq
      method = data['method']; params = data['params']
      methods = @server_object.public_methods(false).map { |i| i.to_s }
      
      # close dead connections
      if data['method'] == '**remote__disconnecting**'
        from.close!
        next
      else
        begin
          raise "Supplied method is not a String" unless method.is_a?(String)
          raise "Supplied params object is not an Array" unless params.is_a?(Array)
          raise "Cannot run '#{method}' because it is not defined in this server" unless methods.include?(method.to_s) or methods.include? :method_missing
          
          puts "Call #{method}(#{params.map(&:inspect).join(', ')})" if log?
          
          @server_object.instance_variable_set(:@caller, from)
          
          result = nil
          
          @incoming_mutex.synchronize do
            if methods.include?(method.to_s)
              result = @server_object.__send__(method.to_s, *params)
            else
              result = @server_object.method_missing(method.to_s, *params)
            end
          end
          
          puts ">> #{method} #=> #{result.inspect}" if log?
          
          from.send_data!({'id' => data['id'], 'result' => result}) unless data['id'].nil?
          
        rescue Exception => e
          from.send_data!({'error' => e, 'id' => data['id']}) unless data['id'].nil?
          puts "Error: #{e}\nBacktrace: " + e.backtrace.join("\n   ") if log?
        end
      end
    end
  end unless @message_processor and @message_processor.alive?
  
  if ( port.nil? or port == false ) == false and @listener.nil?
    @listener = TCPServer.new(port)
    
    @acceptor_thread = Thread.new do
      while started?
        user = Legs.new(@listener.accept, self)
        @incoming_mutex.synchronize { @incoming.push user }
        puts "User #{user.object_id} connected, number of users: #{@incoming.length}" if log?
        self.event :connect, user
      end
    end
  end
end

.started?Boolean

Returns:

  • (Boolean)


207
# File 'lib/legs.rb', line 207

def started?; @state[:started]; end

.stopObject

stops the server, disconnects the clients



328
329
330
# File 'lib/legs.rb', line 328

def stop
  self.class.finalize(@incoming, @state)
end

Instance Method Details

#close!Object

closes the connection and the threads and stuff for this user



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/legs.rb', line 63

def close!
  return if @disconnected == true
  
  @disconnected = true
  puts "User #{inspect} disconnecting" if self.class.log?
  
  # notify the remote side
  notify!('**remote__disconnecting**') rescue nil
  
  if @parent
    @parent.event(:disconnect, self)
    @parent.incoming_mutex.synchronize { @parent.incoming.delete(self) }
  else
    self.class.outgoing_mutex.synchronize { self.class.outgoing.delete(self) }
  end
  
  #Thread.new { sleep(1); @socket.close rescue nil }
  @socket.close
end

#connected?Boolean

I think you can guess this one

Returns:

  • (Boolean)


60
# File 'lib/legs.rb', line 60

def connected?; self.class.connections.include?(self); end

#inspectObject



8
# File 'lib/legs.rb', line 8

def inspect; "<Legs:#{object_id} Meta: #{@meta.inspect}>"; end

#notify!(method, *args, &blk) ⇒ Object

send a notification to this user



84
85
86
87
# File 'lib/legs.rb', line 84

def notify!(method, *args, &blk)
  puts "Notify #{inspect}: #{method}(#{args.map(&:inspect).join(', ')})" if self.class.log?
  send_data!({'method' => method.to_s, 'params' => args, 'id' => nil})
end

#send!(method, *args, &blk) ⇒ Object Also known as: method_missing

sends a normal RPC request that has a response



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/legs.rb', line 90

def send!(method, *args, &blk)
  puts "Call #{self.inspect}: #{method}(#{args.map(&:inspect).join(', ')})" if self.class.log?
  id = get_unique_number
  send_data! 'method' => method.to_s, 'params' => args, 'id' => id
  
  worker = Proc.new do
    sleep 0.1 until @responses_mutex.synchronize { @responses.keys.include?(id) }
    
    result = Legs::Result.new(@responses_mutex.synchronize { @responses.delete(id) })
    puts ">> #{method} #=> #{result.data['result'].inspect}" if self.class.log?
    result
  end
  
  if blk.respond_to?(:call); Thread.new { blk[worker.call] }
  else; worker.call.value; end
end

#send_data!(data) ⇒ Object

sends raw object over the socket



111
112
113
114
115
# File 'lib/legs.rb', line 111

def send_data!(data)
  raise "Lost remote connection" unless connected?
  raw = JSON.generate(json_marshal(data)) + self.class.terminator
  @socket_mutex.synchronize { @socket.write(raw) }
end