Class: EventStore::HTTP::ReadStream

Inherits:
Object
  • Object
show all
Includes:
Request, Log::Dependency
Defined in:
lib/event_store/http/read_stream.rb,
lib/event_store/http/read_stream/defaults.rb,
lib/event_store/http/read_stream/log_text.rb,
lib/event_store/http/read_stream/substitute.rb

Defined Under Namespace

Modules: Defaults, LogText, Substitute

Constant Summary collapse

Error =
Class.new StandardError
StreamNotFoundError =
Class.new Error

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Request

included

Instance Attribute Details

#embedObject

Returns the value of attribute embed.



10
11
12
# File 'lib/event_store/http/read_stream.rb', line 10

def embed
  @embed
end

#long_poll_durationObject

Returns the value of attribute long_poll_duration.



9
10
11
# File 'lib/event_store/http/read_stream.rb', line 9

def long_poll_duration
  @long_poll_duration
end

#output_schemaObject



14
15
16
# File 'lib/event_store/http/read_stream.rb', line 14

def output_schema
  @output_schema ||= default_output_schema
end

Class Method Details

.directionsObject



94
95
96
# File 'lib/event_store/http/read_stream.rb', line 94

def self.directions
  [:forward, :backward]
end

Instance Method Details

#call(stream, position: nil, batch_size: nil, direction: nil) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/event_store/http/read_stream.rb', line 18

def call(stream, position: nil, batch_size: nil, direction: nil)
  batch_size ||= Defaults.batch_size
  position ||= Defaults.position
  direction ||= Defaults.direction

  logger.trace { "Reading stream (#{LogText.attributes stream, position, batch_size, direction})" }

  unless self.class.directions.include? direction
    error_message = "Invalid direction; not `forward' or `backward' (#{LogText.attributes stream, position, batch_size, direction})"
    logger.trace { error_message }
    raise ArgumentError, error_message
  end

  slice_path = self.slice_path stream, position, batch_size, direction

  request = Net::HTTP::Get.new slice_path
  request['Accept'] = MediaTypes::Atom.mime
  request['ES-LongPoll'] = long_poll_duration.to_s if long_poll_duration

  response = connection.request request

  case response
  when Net::HTTPSuccess
    page = Transform::Read.(response.body, :json, output_schema)

    logger.info { "Stream read (#{LogText.attributes stream, position, batch_size, direction, response: response}, OutputSchema: #{output_schema})" }

    page

  when Net::HTTPNotFound
    error_message = "Stream not found (#{LogText.attributes stream, position, batch_size, direction, response: response})"
    logger.error error_message
    raise StreamNotFoundError, error_message

  else
    error_message = "Client error (#{LogText.attributes stream, position, batch_size, direction, response: response})"
    logger.error error_message
    raise Error, error_message
  end
end

#default_output_schemaObject



69
70
71
72
73
74
75
76
77
78
# File 'lib/event_store/http/read_stream.rb', line 69

def default_output_schema
  case embed
  when :body
    MediaTypes::Atom::Page::Embed::Body
  when :rich
    MediaTypes::Atom::Page::Embed::Rich
  else
    MediaTypes::Atom::Page::Embed::None
  end
end

#embed_bodyObject



90
91
92
# File 'lib/event_store/http/read_stream.rb', line 90

def embed_body
  self.embed = :body
end

#embed_richObject



86
87
88
# File 'lib/event_store/http/read_stream.rb', line 86

def embed_rich
  self.embed = :rich
end

#enable_long_poll(duration = nil) ⇒ Object



80
81
82
83
84
# File 'lib/event_store/http/read_stream.rb', line 80

def enable_long_poll(duration=nil)
  duration ||= Defaults.long_poll_duration

  self.long_poll_duration = duration
end

#slice_path(stream, position, batch_size, direction) ⇒ Object



59
60
61
62
63
64
65
66
67
# File 'lib/event_store/http/read_stream.rb', line 59

def slice_path(stream, position, batch_size, direction)
  path = "/streams/#{stream}/#{position}/#{direction}/#{batch_size}"

  if embed
    path << "?embed=#{embed}"
  end

  path
end