Module: MessageStore::Postgres::Get
- Included in:
- Category, Stream
- Defined in:
- lib/message_store/postgres/get.rb,
lib/message_store/postgres/get/stream.rb,
lib/message_store/postgres/get/category.rb,
lib/message_store/postgres/get/condition.rb,
lib/message_store/postgres/get/stream/last.rb,
lib/message_store/postgres/get/category/correlation.rb,
lib/message_store/postgres/get/category/consumer_group.rb
Defined Under Namespace
Modules: BatchSize, Call, Condition, Defaults, Deserialize, Time
Classes: Category, Stream
Class Method Summary
collapse
Instance Method Summary
collapse
Class Method Details
.build(stream_name, **args) ⇒ Object
32
33
34
35
36
37
38
39
40
|
# File 'lib/message_store/postgres/get.rb', line 32
def self.build(stream_name, **args)
cls = specialization(stream_name)
session = args.delete(:session)
cls.build(stream_name, **args).tap do |instance|
instance.configure(session: session)
end
end
|
.call(stream_name, **args) ⇒ Object
54
55
56
57
58
|
# File 'lib/message_store/postgres/get.rb', line 54
def self.call(stream_name, **args)
position = args.delete(:position)
instance = build(stream_name, **args)
instance.(position)
end
|
42
43
44
45
46
47
48
|
# File 'lib/message_store/postgres/get.rb', line 42
def self.configure(receiver, stream_name, **args)
attr_name = args.delete(:attr_name)
attr_name ||= :get
instance = build(stream_name, **args)
receiver.public_send("#{attr_name}=", instance)
end
|
.error_message(pg_error) ⇒ Object
134
135
136
|
# File 'lib/message_store/postgres/get.rb', line 134
def self.error_message(pg_error)
pg_error.message.gsub('ERROR:', '').strip
end
|
.included(cls) ⇒ Object
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
# File 'lib/message_store/postgres/get.rb', line 4
def self.included(cls)
cls.class_exec do
include MessageStore::Get
prepend Call
prepend BatchSize
dependency :session, Session
abstract :configure
abstract :stream_name
abstract :sql_command
abstract :parameters
abstract :parameter_values
abstract :last_position
abstract :log_text
virtual :specialize_error
virtual :assure
end
end
|
.message_data(record) ⇒ Object
109
110
111
112
113
114
115
|
# File 'lib/message_store/postgres/get.rb', line 109
def self.message_data(record)
record['data'] = Get::Deserialize.data(record['data'])
record['metadata'] = Get::Deserialize.metadata(record['metadata'])
record['time'] = Get::Time.utc_coerced(record['time'])
MessageData::Read.build(record)
end
|
.specialization(stream_name) ⇒ Object
138
139
140
141
142
143
144
|
# File 'lib/message_store/postgres/get.rb', line 138
def self.specialization(stream_name)
if StreamName.category?(stream_name)
Category
else
Stream
end
end
|
Instance Method Details
50
51
52
|
# File 'lib/message_store/postgres/get.rb', line 50
def configure(session: nil)
Session.configure(self, session: session)
end
|
#convert(result) ⇒ Object
97
98
99
100
101
102
103
104
105
106
107
|
# File 'lib/message_store/postgres/get.rb', line 97
def convert(result)
logger.trace(tag: :get) { "Converting result to message data (Result Count: #{result.ntuples})" }
message_data = result.map do |record|
Get.message_data(record)
end
logger.debug(tag: :get) { "Converted result to message data (Message Data Count: #{message_data.length})" }
message_data
end
|
#get_result(stream_name, position) ⇒ Object
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
# File 'lib/message_store/postgres/get.rb', line 81
def get_result(stream_name, position)
logger.trace(tag: :get) { "Getting result (#{log_text(stream_name, position)})" }
parameter_values = parameter_values(stream_name, position)
begin
result = session.execute(sql_command, parameter_values)
rescue PG::RaiseException => e
raise_error(e)
end
logger.debug(tag: :get) { "Finished getting result (Count: #{result.ntuples}, #{log_text(stream_name, position)})" }
result
end
|
#raise_error(pg_error) ⇒ Object
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
# File 'lib/message_store/postgres/get.rb', line 117
def raise_error(pg_error)
error_message = Get.error_message(pg_error)
error = Condition.error(error_message)
if error.nil?
error = specialize_error(error_message)
end
if not error.nil?
logger.error { error_message }
raise error
end
raise pg_error
end
|