Class: Arf::Context
- Inherits:
-
Object
- Object
- Arf::Context
- Defined in:
- lib/arf/context.rb
Instance Attribute Summary collapse
-
#error ⇒ Object
Returns the value of attribute error.
-
#has_recv_stream ⇒ Object
Returns the value of attribute has_recv_stream.
-
#has_send_stream ⇒ Object
Returns the value of attribute has_send_stream.
-
#has_sent_response ⇒ Object
Returns the value of attribute has_sent_response.
-
#log ⇒ Object
Returns the value of attribute log.
-
#recv_stream_error ⇒ Object
Returns the value of attribute recv_stream_error.
-
#recv_stream_started ⇒ Object
Returns the value of attribute recv_stream_started.
-
#request ⇒ Object
Returns the value of attribute request.
-
#request_id ⇒ Object
Returns the value of attribute request_id.
-
#response ⇒ Object
Returns the value of attribute response.
-
#send_stream_error ⇒ Object
Returns the value of attribute send_stream_error.
-
#send_stream_finished ⇒ Object
Returns the value of attribute send_stream_finished.
-
#send_stream_started ⇒ Object
Returns the value of attribute send_stream_started.
-
#send_stream_type ⇒ Object
Returns the value of attribute send_stream_type.
-
#stream ⇒ Object
Returns the value of attribute stream.
Instance Method Summary collapse
- #end_send ⇒ Object
-
#initialize(req_id, log, stream) ⇒ Context
constructor
A new instance of Context.
- #prepare ⇒ Object
- #recv ⇒ Object
- #send_response(end_stream: false) ⇒ Object
- #send_stream_metadata ⇒ Object
- #stream_send(val) ⇒ Object
Constructor Details
#initialize(req_id, log, stream) ⇒ Context
Returns a new instance of Context.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/arf/context.rb', line 10 def initialize(req_id, log, stream) @request_id = req_id @log = log @stream = stream @request = nil @error = nil @has_recv_stream = false @recv_stream_error = nil @recv_stream_started_lock = Monitor.new @recv_stream_started = false @has_send_stream = false @send_stream_error = nil @send_stream_started = false @send_stream_started_lock = Monitor.new @send_stream_finished = false @send_stream_type = nil @has_sent_response = false @response = Arf::RPC::Response.new end |
Instance Attribute Details
#error ⇒ Object
Returns the value of attribute error.
5 6 7 |
# File 'lib/arf/context.rb', line 5 def error @error end |
#has_recv_stream ⇒ Object
Returns the value of attribute has_recv_stream.
5 6 7 |
# File 'lib/arf/context.rb', line 5 def has_recv_stream @has_recv_stream end |
#has_send_stream ⇒ Object
Returns the value of attribute has_send_stream.
5 6 7 |
# File 'lib/arf/context.rb', line 5 def has_send_stream @has_send_stream end |
#has_sent_response ⇒ Object
Returns the value of attribute has_sent_response.
5 6 7 |
# File 'lib/arf/context.rb', line 5 def has_sent_response @has_sent_response end |
#log ⇒ Object
Returns the value of attribute log.
5 6 7 |
# File 'lib/arf/context.rb', line 5 def log @log end |
#recv_stream_error ⇒ Object
Returns the value of attribute recv_stream_error.
5 6 7 |
# File 'lib/arf/context.rb', line 5 def recv_stream_error @recv_stream_error end |
#recv_stream_started ⇒ Object
Returns the value of attribute recv_stream_started.
5 6 7 |
# File 'lib/arf/context.rb', line 5 def recv_stream_started @recv_stream_started end |
#request ⇒ Object
Returns the value of attribute request.
5 6 7 |
# File 'lib/arf/context.rb', line 5 def request @request end |
#request_id ⇒ Object
Returns the value of attribute request_id.
5 6 7 |
# File 'lib/arf/context.rb', line 5 def request_id @request_id end |
#response ⇒ Object
Returns the value of attribute response.
5 6 7 |
# File 'lib/arf/context.rb', line 5 def response @response end |
#send_stream_error ⇒ Object
Returns the value of attribute send_stream_error.
5 6 7 |
# File 'lib/arf/context.rb', line 5 def send_stream_error @send_stream_error end |
#send_stream_finished ⇒ Object
Returns the value of attribute send_stream_finished.
5 6 7 |
# File 'lib/arf/context.rb', line 5 def send_stream_finished @send_stream_finished end |
#send_stream_started ⇒ Object
Returns the value of attribute send_stream_started.
5 6 7 |
# File 'lib/arf/context.rb', line 5 def send_stream_started @send_stream_started end |
#send_stream_type ⇒ Object
Returns the value of attribute send_stream_type.
5 6 7 |
# File 'lib/arf/context.rb', line 5 def send_stream_type @send_stream_type end |
#stream ⇒ Object
Returns the value of attribute stream.
5 6 7 |
# File 'lib/arf/context.rb', line 5 def stream @stream end |
Instance Method Details
#end_send ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/arf/context.rb', line 90 def end_send return if @send_stream_finished raise RPC::NoStreamError, false unless @has_send_stream raise @error if @error raise @send_stream_error if @send_stream_error @send_stream_finished = true begin @stream.write_data(RPC::BaseMessage.encode(RPC::EndStream.new), end_stream: true) rescue StandardError => e @log.error("Failed running #end_send", e) @error = e raise end end |
#prepare ⇒ Object
34 35 36 37 |
# File 'lib/arf/context.rb', line 34 def prepare @has_recv_stream = @request.streaming @response..set("arf-request-id", @request_id) end |
#recv ⇒ Object
39 40 41 42 43 44 45 46 47 48 |
# File 'lib/arf/context.rb', line 39 def recv raise RPC::NoStreamError, true unless has_recv_stream raise @error if @error raise @recv_stream_error if @recv_stream_error loop do v = _read_stream_item return v if v end end |
#send_response(end_stream: false) ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/arf/context.rb', line 107 def send_response(end_stream: false) @log.debug("Sending response", response: @response) @has_sent_response = true @send_stream_started = false @response.status ||= :ok @log.debug("Start stream write data") @stream.write_data(RPC::BaseMessage.encode(@response), end_stream:) @log.debug("Send response done") rescue Exception => e @log.error("Failed sending response", e) @error = e raise end |
#send_stream_metadata ⇒ Object
86 87 88 |
# File 'lib/arf/context.rb', line 86 def @stream.write_data(RPC::BaseMessage.encode(RPC::StreamMetadata.new(metadata: @response.))) end |
#stream_send(val) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/arf/context.rb', line 50 def stream_send(val) raise RPC::NoStreamError, false unless @has_send_stream raise @error if @error raise @send_stream_error if @send_stream_error unless has_sent_response # Arf::RPC::Service makes sure this method can be called now. If it # allowed calling without a response being sent, it is acceptable to # push a response as-is, as the client does not expect parameters as # response other than the stream being sent. @log.debug("Pushing synthetic response for method without response values") @response.status = :ok @response.streaming = true @response.params = [] send_response end begin unless @send_stream_started @send_stream_started_lock.synchronize do next if @send_stream_started @log.debug("Pushing StartStream frame") @stream.write_data(RPC::BaseMessage.encode(RPC::StartStream.new)) @send_stream_started = true end end @log.debug("Pushing StreamItem frame") @stream.write_data(RPC::BaseMessage.encode(RPC::StreamItem.new(value: val))) rescue StandardError => e @send_stream_error = e raise end end |