Module: MessageStore::Postgres::Get

Included in:
Category, Stream
Defined in:
lib/message_store/postgres/get.rb,
lib/message_store/postgres/get/stream.rb,
lib/message_store/postgres/get/category.rb,
lib/message_store/postgres/get/condition.rb,
lib/message_store/postgres/get/stream/last.rb,
lib/message_store/postgres/get/category/correlation.rb,
lib/message_store/postgres/get/category/consumer_group.rb

Defined Under Namespace

Modules: BatchSize, Call, Condition, Defaults, Deserialize, Time Classes: Category, Stream

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.build(stream_name, **args) ⇒ Object



31
32
33
34
# File 'lib/message_store/postgres/get.rb', line 31

def self.build(stream_name, **args)
  cls = specialization(stream_name)
  cls.build(stream_name, **args)
end

.call(stream_name, **args) ⇒ Object



48
49
50
51
52
# File 'lib/message_store/postgres/get.rb', line 48

def self.call(stream_name, **args)
  position = args.delete(:position)
  instance = build(stream_name, **args)
  instance.(position)
end

.configure(receiver, stream_name, **args) ⇒ Object



36
37
38
39
40
41
42
# File 'lib/message_store/postgres/get.rb', line 36

def self.configure(receiver, stream_name, **args)
  attr_name = args.delete(:attr_name)
  attr_name ||= :get

  instance = build(stream_name, **args)
  receiver.public_send("#{attr_name}=", instance)
end

.error_message(pg_error) ⇒ Object



128
129
130
# File 'lib/message_store/postgres/get.rb', line 128

def self.error_message(pg_error)
  pg_error.message.gsub('ERROR:', '').strip
end

.included(cls) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/message_store/postgres/get.rb', line 4

def self.included(cls)
  cls.class_exec do
    include MessageStore::Get

    prepend Call
    prepend BatchSize

    dependency :session, Session

    protocol :stream_name
    abstract :sql_command
    abstract :parameters
    abstract :parameter_values
    protocol :last_position
    abstract :log_text

    virtual :specialize_error
    virtual :assure
  end
end

.message_data(record) ⇒ Object



103
104
105
106
107
108
109
# File 'lib/message_store/postgres/get.rb', line 103

def self.message_data(record)
  record['data'] = Get::Deserialize.data(record['data'])
  record['metadata'] = Get::Deserialize.(record['metadata'])
  record['time'] = Get::Time.utc_coerced(record['time'])

  MessageData::Read.build(record)
end

.specialization(stream_name) ⇒ Object



132
133
134
135
136
137
138
# File 'lib/message_store/postgres/get.rb', line 132

def self.specialization(stream_name)
  if StreamName.category?(stream_name)
    Category
  else
    Stream
  end
end

Instance Method Details

#configure(session: nil) ⇒ Object



44
45
46
# File 'lib/message_store/postgres/get.rb', line 44

def configure(session: nil)
  Session.configure(self, session: session)
end

#convert(result) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
# File 'lib/message_store/postgres/get.rb', line 91

def convert(result)
  logger.trace(tag: :get) { "Converting result to message data (Result Count: #{result.ntuples})" }

  message_data = result.map do |record|
    Get.message_data(record)
  end

  logger.debug(tag: :get) { "Converted result to message data (Message Data Count: #{message_data.length})" }

  message_data
end

#get_result(stream_name, position) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/message_store/postgres/get.rb', line 75

def get_result(stream_name, position)
  logger.trace(tag: :get) { "Getting result (#{log_text(stream_name, position)})" }

  parameter_values = parameter_values(stream_name, position)

  begin
    result = session.execute(sql_command, parameter_values)
  rescue PG::RaiseException => e
    raise_error(e)
  end

  logger.debug(tag: :get) { "Finished getting result (Count: #{result.ntuples}, #{log_text(stream_name, position)})" }

  result
end

#raise_error(pg_error) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/message_store/postgres/get.rb', line 111

def raise_error(pg_error)
  error_message = Get.error_message(pg_error)

  error = Condition.error(error_message)

  if error.nil?
    error = specialize_error(error_message)
  end

  if not error.nil?
    logger.error { error_message }
    raise error
  end

  raise pg_error
end