Class: Exa::Internal::Transport::Stream
- Inherits:
-
Object
- Object
- Exa::Internal::Transport::Stream
- Includes:
- Enumerable
- Defined in:
- lib/exa/internal/transport/stream.rb
Instance Method Summary collapse
- #close ⇒ Object
- #content_type ⇒ Object
- #each(&blk) ⇒ Object
- #each_event(&blk) ⇒ Object
- #each_event_json(symbolize: true, &blk) ⇒ Object
- #each_json_line(symbolize: true, &blk) ⇒ Object
- #each_line(&blk) ⇒ Object
-
#initialize(headers:, stream:) ⇒ Stream
constructor
A new instance of Stream.
Constructor Details
#initialize(headers:, stream:) ⇒ Stream
Returns a new instance of Stream.
11 12 13 14 |
# File 'lib/exa/internal/transport/stream.rb', line 11 def initialize(headers:, stream:) @headers = headers @stream = stream end |
Instance Method Details
#close ⇒ Object
64 65 66 |
# File 'lib/exa/internal/transport/stream.rb', line 64 def close Exa::Internal::Util.close_fused!(@stream) end |
#content_type ⇒ Object
68 69 70 |
# File 'lib/exa/internal/transport/stream.rb', line 68 def content_type @headers["content-type"] end |
#each(&blk) ⇒ Object
16 17 18 19 |
# File 'lib/exa/internal/transport/stream.rb', line 16 def each(&blk) return enum_for(__method__) unless block_given? @stream.each(&blk) end |
#each_event(&blk) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/exa/internal/transport/stream.rb', line 41 def each_event(&blk) return enum_for(__method__) unless block_given? sse = Exa::Internal::Util.decode_sse(@stream) begin sse.each do |event| payload = event[:data] payload = payload.chomp if payload yield(event.merge(data: payload)) end ensure close end end |
#each_event_json(symbolize: true, &blk) ⇒ Object
55 56 57 58 59 60 61 62 |
# File 'lib/exa/internal/transport/stream.rb', line 55 def each_event_json(symbolize: true, &blk) return enum_for(__method__, symbolize: symbolize) unless block_given? each_event do |event| data = event[:data] next if data.nil? || data.empty? yield event.merge(data: JSON.parse(data, symbolize_names: symbolize)) end end |
#each_json_line(symbolize: true, &blk) ⇒ Object
33 34 35 36 37 38 39 |
# File 'lib/exa/internal/transport/stream.rb', line 33 def each_json_line(symbolize: true, &blk) return enum_for(__method__, symbolize: symbolize) unless block_given? each_line do |line| next if line.strip.empty? yield JSON.parse(line, symbolize_names: symbolize) end end |
#each_line(&blk) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/exa/internal/transport/stream.rb', line 21 def each_line(&blk) return enum_for(__method__) unless block_given? enumerator = Exa::Internal::Util.decode_lines(@stream) begin enumerator.each do |line| yield line.chomp end ensure close end end |