Class: Arf::Wire::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/arf/wire/stream.rb,
lib/arf/wire/stream/state.rb

Defined Under Namespace

Classes: State

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id, driver) ⇒ Stream

Returns a new instance of Stream.



11
12
13
14
15
16
17
18
19
20
# File 'lib/arf/wire/stream.rb', line 11

def initialize(id, driver)
  @id = id
  @driver = driver
  @state = State.new
  @tmp_data = StringIO.new
  @to_read_monitor = Monitor.new
  @to_read = Queue.new
  @handler = nil
  @log = Arf.logger.with_fields(component: "Stream", driver: @driver.class.name)
end

Instance Attribute Details

#external_idObject

Returns the value of attribute external_id.



9
10
11
# File 'lib/arf/wire/stream.rb', line 9

def external_id
  @external_id
end

#idObject (readonly)

Returns the value of attribute id.



8
9
10
# File 'lib/arf/wire/stream.rb', line 8

def id
  @id
end

#stateObject (readonly)

Returns the value of attribute state.



8
9
10
# File 'lib/arf/wire/stream.rb', line 8

def state
  @state
end

Instance Method Details

#attach_handler(handler) ⇒ Object



22
23
24
25
26
27
28
# File 'lib/arf/wire/stream.rb', line 22

def attach_handler(handler)
  @log.debug("Attached handler", handler:)
  @handler = handler
  @to_read_monitor.synchronize do
    @handler.handle_data(@to_read.shift) until @to_read.empty?
  end
end

#close_localObject



78
79
80
81
82
83
84
85
# File 'lib/arf/wire/stream.rb', line 78

def close_local
  @state.may_send_data?
  @state.close_local
  write(DataFrame) do |fr|
    fr.end_data!
    fr.end_stream!
  end
end

#handle_data(fr) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/arf/wire/stream.rb', line 39

def handle_data(fr)
  @state.may_receive_data?
  data = case fr.payload
         when StringIO then fr.payload.string
         when String then fr.payload
         else fr.payload.to_s
         end

  @tmp_data.write(data)

  if fr.end_data? && @tmp_data.length.positive?
    @tmp_data.rewind
    @to_read_monitor.synchronize do
      if @handler && @to_read.empty?
        @handler.handle_data(@tmp_data)
      else
        @to_read << @tmp_data
      end
      @tmp_data = StringIO.new
    end
  end
  @state.close_remote if fr.end_stream?
rescue ClosedStreamError
  reset!(Wire::ERROR_CODE_STREAM_CLOSED)
end

#handle_reset_stream(fr) ⇒ Object



30
31
32
33
34
35
36
37
# File 'lib/arf/wire/stream.rb', line 30

def handle_reset_stream(fr)
  # return if @state.closed? # Ignore resets when stream is closed
  @state.may_reset_stream?
  @state.close
  @state.error = StreamResetError.new(fr.error_code)
rescue ClosedStreamError
  reset!(Wire::ERROR_CODE_STREAM_CLOSED)
end

#readObject



87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/arf/wire/stream.rb', line 87

def read
  raise @state.error if @state.error

  begin
    val = @to_read.shift(true)
    return val if val
  rescue ThreadError
    nil
  end

  @state.may_receive_data?
  nil
end

#read_blockingObject



101
102
103
104
105
106
107
108
# File 'lib/arf/wire/stream.rb', line 101

def read_blocking
  raise @state.error if @state.error

  val = @to_read.shift
  return val if val

  nil
end

#reset(code) ⇒ Object



72
73
74
75
76
# File 'lib/arf/wire/stream.rb', line 72

def reset(code)
  @state.may_send_reset_stream?
  @state.close
  reset!(code)
end

#reset!(code) ⇒ Object



65
66
67
68
69
70
# File 'lib/arf/wire/stream.rb', line 65

def reset!(code)
  write(ResetStreamFrame) do |fr|
    fr.stream_id = @id
    fr.error_code = code
  end
end

#write(type) ⇒ Object



110
111
112
113
114
115
# File 'lib/arf/wire/stream.rb', line 110

def write(type, &)
  @log.debug("Dispatching", type:)
  inst = type.new(&)
  inst.stream_id = @id if inst.respond_to?(:stream_id=)
  @driver.dispatch(inst)
end

#write_data(value, end_stream: false) ⇒ Object



117
118
119
120
121
122
123
124
125
# File 'lib/arf/wire/stream.rb', line 117

def write_data(value, end_stream: false)
  @state.may_send_data?

  @log.dump("Writing", value, source: caller[0])
  Wire.data_frames_from_buffer(@id, value, end_stream:).each do |fr|
    @driver.dispatch(fr)
  end
  @state.close_local if end_stream
end