Class: Pmux::MRSession

Inherits:
MultiSession show all
Includes:
MessageReceiver
Defined in:
lib/pmux/multi_session.rb

Instance Attribute Summary

Attributes inherited from MultiSession

#loop, #timeout

Instance Method Summary collapse

Methods inherited from MultiSession

#close_channel, #connect_to_addr, #on_error, #process_scp_queue_once, #scp_download, #scp_download_sub, #scp_upload, #scp_upload_files, #scp_upload_sub

Constructor Details

#initialize(addrs, options = {}, loop = nil) ⇒ MRSession

Returns a new instance of MRSession.



249
250
251
252
253
254
255
256
257
# File 'lib/pmux/multi_session.rb', line 249

def initialize addrs, options={}, loop=nil
  super

  @reqtable = {}
  @seqid = 0

  program_name = options[:program_name] || 'pmux'
  @cmd = "#{program_name} --server"
end

Instance Method Details

#call_async(addr, method, *args) ⇒ Object



284
285
286
# File 'lib/pmux/multi_session.rb', line 284

def call_async addr, method, *args
  send_request addr, method, args
end

#connectObject



259
260
261
262
263
# File 'lib/pmux/multi_session.rb', line 259

def connect
  for addr in @addrs
    connect_to_addr addr, @cmd + " --ipaddr=#{addr}"
  end
end

#error_on_addr(addr, err = nil) ⇒ Object



276
277
278
279
280
281
282
# File 'lib/pmux/multi_session.rb', line 276

def error_on_addr addr, err=nil
  super
  err ||= 'closed'
  @reqtable.select {|msgid, f| f.addr == addr}.each {|msgid, f|
    f.set_result err, nil
  }
end

#multicast_call_async(method, *args) ⇒ Object



288
289
290
291
292
293
294
295
# File 'lib/pmux/multi_session.rb', line 288

def multicast_call_async method, *args
  mf = MR::MultiFuture.new
  for addr in @addrs
    future = send_request addr, method, args
    mf.add future
  end
  mf
end

#on_response(msgid, error, result) ⇒ Object



297
298
299
300
301
# File 'lib/pmux/multi_session.rb', line 297

def on_response msgid, error, result
  if (future = @reqtable.delete msgid)
    future.set_result error, result
  end
end

#setup_channel(ch) ⇒ Object



265
266
267
268
269
270
271
272
273
274
# File 'lib/pmux/multi_session.rb', line 265

def setup_channel ch
  pac = MessagePack::Unpacker.new
  ch.on_data {|c, data|
    pac.feed_each(data) {|obj| on_message obj}
  }
  ch.on_extended_data {|c, type, data|
    #STDERR.puts c.connection.host+': '+data
  }
  ch.on_close {|c| error_on_addr ch.connection.host}
end