Class: Rake::Subproject::Client::SessionManager
- Inherits:
-
Object
- Object
- Rake::Subproject::Client::SessionManager
- Defined in:
- lib/rake/subproject/client/session_manager.rb
Overview
:nodoc: all
Class Method Summary collapse
Instance Method Summary collapse
- #close ⇒ Object
- #create_session ⇒ Object
-
#initialize(port) ⇒ SessionManager
constructor
A new instance of SessionManager.
- #start(&block) ⇒ Object
- #with_session ⇒ Object
Constructor Details
#initialize(port) ⇒ SessionManager
Returns a new instance of SessionManager.
5 6 7 8 9 |
# File 'lib/rake/subproject/client/session_manager.rb', line 5 def initialize(port) fail ArgumentError, "Requires a Port object" unless port.kind_of?(Port) @port = port @session_queues = {} end |
Class Method Details
.with_each_session(port, &block) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/rake/subproject/client/session_manager.rb', line 11 def self.with_each_session(port, &block) return unless block_given? session_manager = self.new(port) threads = Set.new mutex = Mutex.new session_manager.start do |session| mutex.synchronize do threads << thread = Thread.start do block.call(session) mutex.synchronize { threads.delete thread } end end end ensure log "Waiting for #{threads.count} threads" mutex.synchronize { threads.dup }.each(&:join) session_manager.close end |
Instance Method Details
#close ⇒ Object
73 74 75 |
# File 'lib/rake/subproject/client/session_manager.rb', line 73 def close log "session manager: closing #{@port.inspect}" end |
#create_session ⇒ Object
66 67 68 69 70 71 |
# File 'lib/rake/subproject/client/session_manager.rb', line 66 def create_session session = Session.send(:new, self) @port.write(message: 'create_session', session_id: session.id) @session_queues[session.id] = Queue.new session end |
#start(&block) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/rake/subproject/client/session_manager.rb', line 30 def start(&block) log "starting read loop on #{@port.inspect}" loop do envelope = @port.read fail "no 'message' tag in envelope" if ( = envelope['message']).nil? fail "no 'session_id' tag in envelope" if (session_id = envelope['session_id']).nil? || session_id == "0" session_id = session_id.to_i fail "null(0) 'session_id' tag in envelope" if session_id == 0 case when 'create_session' session = Session.new(self, session_id.to_i) @session_queues[session.id] = Queue.new block.call(session) unless block.nil? when 'message_session' fail "No session for #{session_id.to_i}" if (queue = @session_queues[session_id.to_i]).nil? queue << envelope['payload'] else fail "did not recognize message: '#{}'" if (session_id = envelope['session_id']).nil? || session_id == 0 end end rescue EOFError => e # log "#{e.message}\n#{e.backtrace.join("\n")}" rescue IOError => e # log "#{e.message}\n#{e.backtrace.join("\n")}" # raise end |
#with_session ⇒ Object
59 60 61 62 63 64 |
# File 'lib/rake/subproject/client/session_manager.rb', line 59 def with_session return unless block_given? yield session = create_session ensure session.close unless session.nil? end |