Class: Arf::Context

Inherits:
Object
  • Object
show all
Defined in:
lib/arf/context.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(req_id, log, stream) ⇒ Context

Returns a new instance of Context.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/arf/context.rb', line 10

def initialize(req_id, log, stream)
  @request_id = req_id
  @log = log
  @stream = stream

  @request = nil
  @error = nil

  @has_recv_stream = false
  @recv_stream_error = nil
  @recv_stream_started_lock = Monitor.new
  @recv_stream_started = false

  @has_send_stream = false
  @send_stream_error = nil
  @send_stream_started = false
  @send_stream_started_lock = Monitor.new
  @send_stream_finished = false
  @send_stream_type = nil

  @has_sent_response = false
  @response = Arf::RPC::Response.new
end

Instance Attribute Details

#errorObject

Returns the value of attribute error.



5
6
7
# File 'lib/arf/context.rb', line 5

def error
  @error
end

#has_recv_streamObject

Returns the value of attribute has_recv_stream.



5
6
7
# File 'lib/arf/context.rb', line 5

def has_recv_stream
  @has_recv_stream
end

#has_send_streamObject

Returns the value of attribute has_send_stream.



5
6
7
# File 'lib/arf/context.rb', line 5

def has_send_stream
  @has_send_stream
end

#has_sent_responseObject

Returns the value of attribute has_sent_response.



5
6
7
# File 'lib/arf/context.rb', line 5

def has_sent_response
  @has_sent_response
end

#logObject

Returns the value of attribute log.



5
6
7
# File 'lib/arf/context.rb', line 5

def log
  @log
end

#recv_stream_errorObject

Returns the value of attribute recv_stream_error.



5
6
7
# File 'lib/arf/context.rb', line 5

def recv_stream_error
  @recv_stream_error
end

#recv_stream_startedObject

Returns the value of attribute recv_stream_started.



5
6
7
# File 'lib/arf/context.rb', line 5

def recv_stream_started
  @recv_stream_started
end

#requestObject

Returns the value of attribute request.



5
6
7
# File 'lib/arf/context.rb', line 5

def request
  @request
end

#request_idObject

Returns the value of attribute request_id.



5
6
7
# File 'lib/arf/context.rb', line 5

def request_id
  @request_id
end

#responseObject

Returns the value of attribute response.



5
6
7
# File 'lib/arf/context.rb', line 5

def response
  @response
end

#send_stream_errorObject

Returns the value of attribute send_stream_error.



5
6
7
# File 'lib/arf/context.rb', line 5

def send_stream_error
  @send_stream_error
end

#send_stream_finishedObject

Returns the value of attribute send_stream_finished.



5
6
7
# File 'lib/arf/context.rb', line 5

def send_stream_finished
  @send_stream_finished
end

#send_stream_startedObject

Returns the value of attribute send_stream_started.



5
6
7
# File 'lib/arf/context.rb', line 5

def send_stream_started
  @send_stream_started
end

#send_stream_typeObject

Returns the value of attribute send_stream_type.



5
6
7
# File 'lib/arf/context.rb', line 5

def send_stream_type
  @send_stream_type
end

#streamObject

Returns the value of attribute stream.



5
6
7
# File 'lib/arf/context.rb', line 5

def stream
  @stream
end

Instance Method Details

#end_sendObject

Raises:

  • (RPC::NoStreamError)


90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/arf/context.rb', line 90

def end_send
  return if @send_stream_finished
  raise RPC::NoStreamError, false unless @has_send_stream
  raise @error if @error
  raise @send_stream_error if @send_stream_error

  @send_stream_finished = true

  begin
    @stream.write_data(RPC::BaseMessage.encode(RPC::EndStream.new), end_stream: true)
  rescue StandardError => e
    @log.error("Failed running #end_send", e)
    @error = e
    raise
  end
end

#prepareObject



34
35
36
37
# File 'lib/arf/context.rb', line 34

def prepare
  @has_recv_stream = @request.streaming
  @response..set("arf-request-id", @request_id)
end

#recvObject

Raises:

  • (RPC::NoStreamError)


39
40
41
42
43
44
45
46
47
48
# File 'lib/arf/context.rb', line 39

def recv
  raise RPC::NoStreamError, true unless has_recv_stream
  raise @error if @error
  raise @recv_stream_error if @recv_stream_error

  loop do
    v = _read_stream_item
    return v if v
  end
end

#send_response(end_stream: false) ⇒ Object



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/arf/context.rb', line 107

def send_response(end_stream: false)
  @log.debug("Sending response", response: @response)

  @has_sent_response = true
  @send_stream_started = false

  @response.status ||= :ok
  @log.debug("Start stream write data")
  @stream.write_data(RPC::BaseMessage.encode(@response), end_stream:)
  @log.debug("Send response done")
rescue Exception => e
  @log.error("Failed sending response", e)
  @error = e
  raise
end

#send_stream_metadataObject



86
87
88
# File 'lib/arf/context.rb', line 86

def 
  @stream.write_data(RPC::BaseMessage.encode(RPC::StreamMetadata.new(metadata: @response.)))
end

#stream_send(val) ⇒ Object

Raises:

  • (RPC::NoStreamError)


50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/arf/context.rb', line 50

def stream_send(val)
  raise RPC::NoStreamError, false unless @has_send_stream
  raise @error if @error
  raise @send_stream_error if @send_stream_error

  unless has_sent_response
    # Arf::RPC::Service makes sure this method can be called now. If it
    # allowed calling without a response being sent, it is acceptable to
    # push a response as-is, as the client does not expect parameters as
    # response other than the stream being sent.
    @log.debug("Pushing synthetic response for method without response values")
    @response.status = :ok
    @response.streaming = true
    @response.params = []
    send_response
  end

  begin
    unless @send_stream_started
      @send_stream_started_lock.synchronize do
        next if @send_stream_started

        @log.debug("Pushing StartStream frame")
        @stream.write_data(RPC::BaseMessage.encode(RPC::StartStream.new))
        @send_stream_started = true
      end
    end

    @log.debug("Pushing StreamItem frame")
    @stream.write_data(RPC::BaseMessage.encode(RPC::StreamItem.new(value: val)))
  rescue StandardError => e
    @send_stream_error = e
    raise
  end
end