Module: MessageStore::Postgres::Get

Included in:
Category, Stream
Defined in:
lib/message_store/postgres/get.rb,
lib/message_store/postgres/get/stream.rb,
lib/message_store/postgres/get/category.rb,
lib/message_store/postgres/get/condition.rb,
lib/message_store/postgres/get/stream/last.rb,
lib/message_store/postgres/get/category/correlation.rb,
lib/message_store/postgres/get/category/consumer_group.rb

Defined Under Namespace

Modules: BatchSize, Call, Condition, Defaults, Deserialize, Time Classes: Category, Stream

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.build(stream_name, **args) ⇒ Object



32
33
34
35
36
37
38
39
40
# File 'lib/message_store/postgres/get.rb', line 32

def self.build(stream_name, **args)
  cls = specialization(stream_name)

  session = args.delete(:session)

  cls.build(stream_name, **args).tap do |instance|
    instance.configure(session: session)
  end
end

.call(stream_name, **args) ⇒ Object



54
55
56
57
58
# File 'lib/message_store/postgres/get.rb', line 54

def self.call(stream_name, **args)
  position = args.delete(:position)
  instance = build(stream_name, **args)
  instance.(position)
end

.configure(receiver, stream_name, **args) ⇒ Object



42
43
44
45
46
47
48
# File 'lib/message_store/postgres/get.rb', line 42

def self.configure(receiver, stream_name, **args)
  attr_name = args.delete(:attr_name)
  attr_name ||= :get

  instance = build(stream_name, **args)
  receiver.public_send("#{attr_name}=", instance)
end

.error_message(pg_error) ⇒ Object



134
135
136
# File 'lib/message_store/postgres/get.rb', line 134

def self.error_message(pg_error)
  pg_error.message.gsub('ERROR:', '').strip
end

.included(cls) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/message_store/postgres/get.rb', line 4

def self.included(cls)
  cls.class_exec do
    include MessageStore::Get

    prepend Call
    prepend BatchSize

    dependency :session, Session

    abstract :configure
    abstract :stream_name
    abstract :sql_command
    abstract :parameters
    abstract :parameter_values
    abstract :last_position
    abstract :log_text

    virtual :specialize_error
    virtual :assure
  end
end

.message_data(record) ⇒ Object



109
110
111
112
113
114
115
# File 'lib/message_store/postgres/get.rb', line 109

def self.message_data(record)
  record['data'] = Get::Deserialize.data(record['data'])
  record['metadata'] = Get::Deserialize.(record['metadata'])
  record['time'] = Get::Time.utc_coerced(record['time'])

  MessageData::Read.build(record)
end

.specialization(stream_name) ⇒ Object



138
139
140
141
142
143
144
# File 'lib/message_store/postgres/get.rb', line 138

def self.specialization(stream_name)
  if StreamName.category?(stream_name)
    Category
  else
    Stream
  end
end

Instance Method Details

#configure(session: nil) ⇒ Object



50
51
52
# File 'lib/message_store/postgres/get.rb', line 50

def configure(session: nil)
  Session.configure(self, session: session)
end

#convert(result) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
# File 'lib/message_store/postgres/get.rb', line 97

def convert(result)
  logger.trace(tag: :get) { "Converting result to message data (Result Count: #{result.ntuples})" }

  message_data = result.map do |record|
    Get.message_data(record)
  end

  logger.debug(tag: :get) { "Converted result to message data (Message Data Count: #{message_data.length})" }

  message_data
end

#get_result(stream_name, position) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/message_store/postgres/get.rb', line 81

def get_result(stream_name, position)
  logger.trace(tag: :get) { "Getting result (#{log_text(stream_name, position)})" }

  parameter_values = parameter_values(stream_name, position)

  begin
    result = session.execute(sql_command, parameter_values)
  rescue PG::RaiseException => e
    raise_error(e)
  end

  logger.debug(tag: :get) { "Finished getting result (Count: #{result.ntuples}, #{log_text(stream_name, position)})" }

  result
end

#raise_error(pg_error) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/message_store/postgres/get.rb', line 117

def raise_error(pg_error)
  error_message = Get.error_message(pg_error)

  error = Condition.error(error_message)

  if error.nil?
    error = specialize_error(error_message)
  end

  if not error.nil?
    logger.error { error_message }
    raise error
  end

  raise pg_error
end