Class: Arf::Wire::Stream
- Inherits:
-
Object
- Object
- Arf::Wire::Stream
- Defined in:
- lib/arf/wire/stream.rb,
lib/arf/wire/stream/state.rb
Defined Under Namespace
Classes: State
Instance Attribute Summary collapse
-
#external_id ⇒ Object
Returns the value of attribute external_id.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
Instance Method Summary collapse
- #attach_handler(handler) ⇒ Object
- #close_local ⇒ Object
- #handle_data(fr) ⇒ Object
- #handle_reset_stream(fr) ⇒ Object
-
#initialize(id, driver) ⇒ Stream
constructor
A new instance of Stream.
- #read ⇒ Object
- #read_blocking ⇒ Object
- #reset(code) ⇒ Object
- #reset!(code) ⇒ Object
- #write(type) ⇒ Object
- #write_data(value, end_stream: false) ⇒ Object
Constructor Details
#initialize(id, driver) ⇒ Stream
Returns a new instance of Stream.
11 12 13 14 15 16 17 18 19 20 |
# File 'lib/arf/wire/stream.rb', line 11 def initialize(id, driver) @id = id @driver = driver @state = State.new @tmp_data = StringIO.new @to_read_monitor = Monitor.new @to_read = Queue.new @handler = nil @log = Arf.logger.with_fields(component: "Stream", driver: @driver.class.name) end |
Instance Attribute Details
#external_id ⇒ Object
Returns the value of attribute external_id.
9 10 11 |
# File 'lib/arf/wire/stream.rb', line 9 def external_id @external_id end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
8 9 10 |
# File 'lib/arf/wire/stream.rb', line 8 def id @id end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
8 9 10 |
# File 'lib/arf/wire/stream.rb', line 8 def state @state end |
Instance Method Details
#attach_handler(handler) ⇒ Object
22 23 24 25 26 27 28 |
# File 'lib/arf/wire/stream.rb', line 22 def attach_handler(handler) @log.debug("Attached handler", handler:) @handler = handler @to_read_monitor.synchronize do @handler.handle_data(@to_read.shift) until @to_read.empty? end end |
#close_local ⇒ Object
78 79 80 81 82 83 84 85 |
# File 'lib/arf/wire/stream.rb', line 78 def close_local @state.may_send_data? @state.close_local write(DataFrame) do |fr| fr.end_data! fr.end_stream! end end |
#handle_data(fr) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/arf/wire/stream.rb', line 39 def handle_data(fr) @state.may_receive_data? data = case fr.payload when StringIO then fr.payload.string when String then fr.payload else fr.payload.to_s end @tmp_data.write(data) if fr.end_data? && @tmp_data.length.positive? @tmp_data.rewind @to_read_monitor.synchronize do if @handler && @to_read.empty? @handler.handle_data(@tmp_data) else @to_read << @tmp_data end @tmp_data = StringIO.new end end @state.close_remote if fr.end_stream? rescue ClosedStreamError reset!(Wire::ERROR_CODE_STREAM_CLOSED) end |
#handle_reset_stream(fr) ⇒ Object
30 31 32 33 34 35 36 37 |
# File 'lib/arf/wire/stream.rb', line 30 def handle_reset_stream(fr) # return if @state.closed? # Ignore resets when stream is closed @state.may_reset_stream? @state.close @state.error = StreamResetError.new(fr.error_code) rescue ClosedStreamError reset!(Wire::ERROR_CODE_STREAM_CLOSED) end |
#read ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/arf/wire/stream.rb', line 87 def read raise @state.error if @state.error begin val = @to_read.shift(true) return val if val rescue ThreadError nil end @state.may_receive_data? nil end |
#read_blocking ⇒ Object
101 102 103 104 105 106 107 108 |
# File 'lib/arf/wire/stream.rb', line 101 def read_blocking raise @state.error if @state.error val = @to_read.shift return val if val nil end |
#reset(code) ⇒ Object
72 73 74 75 76 |
# File 'lib/arf/wire/stream.rb', line 72 def reset(code) @state.may_send_reset_stream? @state.close reset!(code) end |
#reset!(code) ⇒ Object
65 66 67 68 69 70 |
# File 'lib/arf/wire/stream.rb', line 65 def reset!(code) write(ResetStreamFrame) do |fr| fr.stream_id = @id fr.error_code = code end end |
#write(type) ⇒ Object
110 111 112 113 114 115 |
# File 'lib/arf/wire/stream.rb', line 110 def write(type, &) @log.debug("Dispatching", type:) inst = type.new(&) inst.stream_id = @id if inst.respond_to?(:stream_id=) @driver.dispatch(inst) end |
#write_data(value, end_stream: false) ⇒ Object
117 118 119 120 121 122 123 124 125 |
# File 'lib/arf/wire/stream.rb', line 117 def write_data(value, end_stream: false) @state.may_send_data? @log.dump("Writing", value, source: caller[0]) Wire.data_frames_from_buffer(@id, value, end_stream:).each do |fr| @driver.dispatch(fr) end @state.close_local if end_stream end |