Class: GrpcKit::Session::ClientSession

Inherits:
DS9::Client
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/grpc_kit/session/client_session.rb

Defined Under Namespace

Classes: ConnectionClosing

Constant Summary collapse

MAX_STREAM_ID =
2**31 - 1

Instance Method Summary collapse

Constructor Details

#initialize(io, opts = {}) ⇒ ClientSession

Returns a new instance of ClientSession.

Parameters:



21
22
23
24
25
26
27
28
29
30
# File 'lib/grpc_kit/session/client_session.rb', line 21

def initialize(io, opts = {})
  super() # initialize DS9::Session

  @io = io
  @streams = {}
  @opts = opts
  @draining = false
  @stop = false
  @no_write_data = false
end

Instance Method Details

#run_oncevoid

This method returns an undefined value.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/grpc_kit/session/client_session.rb', line 65

def run_once
  return if @stop

  if @draining && @drain_time < Time.now
    raise 'trasport is closing'
  end

  if @no_write_data
    @io.wait_readable

    if want_read?
      do_read
    end
  else
    rs, ws = @io.select
    if !rs.empty? && want_read?
      do_read
    end

    if !ws.empty? && want_write?
      send
    end
  end
end

#send_request(headers) ⇒ void

This method returns an undefined value.

Parameters:

  • headers (Hash<String,String>)


34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/grpc_kit/session/client_session.rb', line 34

def send_request(headers)
  if @draining
    raise ConnectionClosing, "You can't send new request. becuase this connection will shuting down"
  end

  stream = GrpcKit::Session::Stream.new(stream_id: 0) # set later
  stream_id = submit_request(headers, stream.pending_send_data).to_i
  stream.stream_id = stream_id
  @streams[stream_id] = stream
  @no_write_data = false
  stream
end

#start(stream_id) ⇒ void

This method returns an undefined value.

Parameters:

  • stream_id (Integer)


49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/grpc_kit/session/client_session.rb', line 49

def start(stream_id)
  stream = @streams.fetch(stream_id)

  loop do
    if (!want_read? && !want_write?) || stream.close?
      break
    end

    run_once
  end
rescue Errno::ECONNRESET, IOError => e
  GrpcKit.logger.debug(e.message)
  shutdown
end