Class: GrpcKit::Session::ServerSession

Inherits:
DS9::Server
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/grpc_kit/session/server_session.rb

Instance Method Summary collapse

Constructor Details

#initialize(io, dispatcher) ⇒ ServerSession

Returns a new instance of ServerSession.



19
20
21
22
23
24
25
26
27
28
29
# File 'lib/grpc_kit/session/server_session.rb', line 19

def initialize(io, dispatcher)
  super() # initialize DS9::Session

  @io = io
  @streams = {}
  @stop = false
  @dispatcher = dispatcher
  @peer_shutdowned = false
  @inflights = []
  @drain = nil
end

Instance Method Details

#drainObject



81
82
83
# File 'lib/grpc_kit/session/server_session.rb', line 81

def drain
  @drain ||= GrpcKit::Session::DrainController.new
end

#run_oncebool

Return session can continue

Returns:

  • (bool)

    return session can continue



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/grpc_kit/session/server_session.rb', line 50

def run_once
  if @peer_shutdowned || @stop || !(want_read? || want_write?)
    # it could be called twice
    @streams.each_value(&:close)
    return false
  end

  if @drain
    if @streams.empty?
      shutdown
      return
    end

    @drain.call(self)
  end

  if want_read?
    do_read
  end

  if want_write?
    send
  end

  true
rescue Errno::ECONNRESET, IOError => e
  GrpcKit.logger.debug(e.message)
  shutdown
  false
end

#shutdownObject



85
86
87
88
89
90
# File 'lib/grpc_kit/session/server_session.rb', line 85

def shutdown
  stop
  @io.close
rescue StandardError => e
  GrpcKit.logger.debug(e)
end

#startObject



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/grpc_kit/session/server_session.rb', line 31

def start
  loop do
    invoke

    if @streams.count == 0
      unless @io.wait_readable
        shutdown
        break
      end
    end

    continue = run_once
    break unless continue
  end
ensure
  GrpcKit.logger.debug('Finish server session')
end