Class: MessageStore::Postgres::Get::Stream

Inherits:
Object
  • Object
show all
Includes:
MessageStore::Postgres::Get
Defined in:
lib/message_store/postgres/get/stream.rb,
lib/message_store/postgres/get/stream/last.rb

Defined Under Namespace

Modules: Defaults Classes: Last

Constant Summary collapse

Error =
Class.new(RuntimeError)

Class Method Summary collapse

Instance Method Summary collapse

Methods included from MessageStore::Postgres::Get

#convert, error_message, #get_result, included, message_data, #raise_error, specialization

Class Method Details

.build(stream_name, batch_size: nil, condition: nil, session: nil) ⇒ Object



16
17
18
19
20
# File 'lib/message_store/postgres/get/stream.rb', line 16

def self.build(stream_name, batch_size: nil, condition: nil, session: nil)
  instance = new(stream_name, batch_size, condition)
  instance.configure(session: session)
  instance
end

.call(stream_name, position: nil, batch_size: nil, condition: nil, session: nil) ⇒ Object



11
12
13
14
# File 'lib/message_store/postgres/get/stream.rb', line 11

def self.call(stream_name, position: nil, batch_size: nil, condition: nil, session: nil)
  instance = build(stream_name, batch_size: batch_size, condition: condition, session: session)
  instance.(position)
end

.configure(receiver, stream_name, attr_name: nil, batch_size: nil, condition: nil, session: nil) ⇒ Object



22
23
24
25
26
# File 'lib/message_store/postgres/get/stream.rb', line 22

def self.configure(receiver, stream_name, attr_name: nil, batch_size: nil, condition: nil, session: nil)
  attr_name ||= :get
  instance = build(stream_name, batch_size: batch_size, condition: condition, session: session)
  receiver.public_send("#{attr_name}=", instance)
end

Instance Method Details

#assureObject



57
58
59
60
61
# File 'lib/message_store/postgres/get/stream.rb', line 57

def assure
  if MessageStore::StreamName.category?(stream_name)
    raise Error, "Must be a stream name (Category: #{stream_name})"
  end
end

#configure(session: nil) ⇒ Object



28
29
30
# File 'lib/message_store/postgres/get/stream.rb', line 28

def configure(session: nil)
  Session.configure(self, session: session)
end

#last_position(batch) ⇒ Object



49
50
51
# File 'lib/message_store/postgres/get/stream.rb', line 49

def last_position(batch)
  batch.last.position
end

#log_text(stream_name, position) ⇒ Object



53
54
55
# File 'lib/message_store/postgres/get/stream.rb', line 53

def log_text(stream_name, position)
  "Stream Name: #{stream_name}, Position: #{position.inspect}, Batch Size: #{batch_size.inspect}, Correlation: #{correlation.inspect}, Condition: #{condition.inspect})"
end

#parameter_values(stream_name, position) ⇒ Object



40
41
42
43
44
45
46
47
# File 'lib/message_store/postgres/get/stream.rb', line 40

def parameter_values(stream_name, position)
  [
    stream_name,
    position,
    batch_size,
    condition
  ]
end

#parametersObject



36
37
38
# File 'lib/message_store/postgres/get/stream.rb', line 36

def parameters
  '$1::varchar, $2::bigint, $3::bigint, $4::varchar'
end

#sql_commandObject



32
33
34
# File 'lib/message_store/postgres/get/stream.rb', line 32

def sql_command
  "SELECT * FROM get_stream_messages(#{parameters});"
end