Class: EventStore::HTTP::ReadStream::Substitute::ReadStream

Inherits:
Object
  • Object
show all
Defined in:
lib/event_store/http/read_stream/substitute.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#body_embed_enabledObject Also known as: body_embed_enabled?

Returns the value of attribute body_embed_enabled.



12
13
14
# File 'lib/event_store/http/read_stream/substitute.rb', line 12

def body_embed_enabled
  @body_embed_enabled
end

#long_poll_durationObject

Returns the value of attribute long_poll_duration.



13
14
15
# File 'lib/event_store/http/read_stream/substitute.rb', line 13

def long_poll_duration
  @long_poll_duration
end

#rich_embed_enabledObject Also known as: rich_embed_enabled?

Returns the value of attribute rich_embed_enabled.



14
15
16
# File 'lib/event_store/http/read_stream/substitute.rb', line 14

def rich_embed_enabled
  @rich_embed_enabled
end

Class Method Details

.buildObject



18
19
20
21
22
23
24
25
26
27
# File 'lib/event_store/http/read_stream/substitute.rb', line 18

def self.build
  telemetry_sink = Telemetry::Sink.new

  instance = new telemetry_sink

  telemetry = ::Telemetry.configure instance
  telemetry.register telemetry_sink

  instance
end

Instance Method Details

#call(stream, position: nil, direction: nil, batch_size: nil) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/event_store/http/read_stream/substitute.rb', line 29

def call(stream, position: nil, direction: nil, batch_size: nil)
  unless direction.nil?
    unless EventStore::HTTP::ReadStream.directions.include? direction
      raise ArgumentError
    end
  end

  page = streams.fetch stream do
    raise EventStore::HTTP::ReadStream::StreamNotFoundError
  end

  telemetry.record :read, Telemetry::Read.new(stream, position, direction, batch_size)

  page
end

#embed_bodyObject



70
71
72
# File 'lib/event_store/http/read_stream/substitute.rb', line 70

def embed_body
  self.body_embed_enabled = true
end

#embed_richObject



66
67
68
# File 'lib/event_store/http/read_stream/substitute.rb', line 66

def embed_rich
  self.rich_embed_enabled = true
end

#enable_long_poll(duration = nil) ⇒ Object



53
54
55
56
57
# File 'lib/event_store/http/read_stream/substitute.rb', line 53

def enable_long_poll(duration=nil)
  duration ||= HTTP::ReadStream::Defaults.long_poll_duration

  self.long_poll_duration = duration
end

#long_poll_enabled?(duration = nil) ⇒ Boolean

Returns:

  • (Boolean)


59
60
61
62
63
64
# File 'lib/event_store/http/read_stream/substitute.rb', line 59

def long_poll_enabled?(duration=nil)
  return false if long_poll_duration.nil?
  return true if duration.nil?

  long_poll_duration == duration
end

#read?(&block) ⇒ Boolean

Returns:

  • (Boolean)


45
46
47
48
49
50
51
# File 'lib/event_store/http/read_stream/substitute.rb', line 45

def read?(&block)
  block ||= proc { true }

  telemetry_sink.recorded? do |record|
    block.(*record.data.to_a)
  end
end

#set_response(stream, page, position: nil) ⇒ Object



74
75
76
# File 'lib/event_store/http/read_stream/substitute.rb', line 74

def set_response(stream, page, position: nil)
  streams[stream] = page
end

#streamsObject



78
79
80
# File 'lib/event_store/http/read_stream/substitute.rb', line 78

def streams
  @streams ||= {}
end