Class: EventSource::Postgres::Put
- Inherits:
-
Object
- Object
- EventSource::Postgres::Put
- Includes:
- Log::Dependency
- Defined in:
- lib/event_source/postgres/put.rb
Class Method Summary collapse
- .build(session: nil) ⇒ Object
- .call(write_event, stream_name, expected_version: nil, session: nil) ⇒ Object
- .configure(receiver, session: nil, attr_name: nil) ⇒ Object
- .statement ⇒ Object
Instance Method Summary collapse
- #call(write_event, stream_name, expected_version: nil) ⇒ Object
- #configure(session: nil) ⇒ Object
- #destructure_event(write_event) ⇒ Object
- #execute_query(id, stream_name, type, serialized_data, serialized_metadata, expected_version) ⇒ Object
- #insert_event(id, stream_name, type, data, metadata, expected_version) ⇒ Object
- #position(records) ⇒ Object
- #raise_error(pg_error) ⇒ Object
- #serialized_data(data) ⇒ Object
- #serialized_metadata(metadata) ⇒ Object
Class Method Details
.build(session: nil) ⇒ Object
9 10 11 12 13 |
# File 'lib/event_source/postgres/put.rb', line 9 def self.build(session: nil) new.tap do |instance| instance.configure(session: session) end end |
.call(write_event, stream_name, expected_version: nil, session: nil) ⇒ Object
26 27 28 29 |
# File 'lib/event_source/postgres/put.rb', line 26 def self.call(write_event, stream_name, expected_version: nil, session: nil) instance = build(session: session) instance.(write_event, stream_name, expected_version: expected_version) end |
.configure(receiver, session: nil, attr_name: nil) ⇒ Object
20 21 22 23 24 |
# File 'lib/event_source/postgres/put.rb', line 20 def self.configure(receiver, session: nil, attr_name: nil) attr_name ||= :put instance = build(session: session) receiver.public_send "#{attr_name}=", instance end |
.statement ⇒ Object
90 91 92 |
# File 'lib/event_source/postgres/put.rb', line 90 def self.statement @statement ||= "SELECT write_event($1::varchar, $2::varchar, $3::varchar, $4::jsonb, $5::jsonb, $6::int);" end |
Instance Method Details
#call(write_event, stream_name, expected_version: nil) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/event_source/postgres/put.rb', line 31 def call(write_event, stream_name, expected_version: nil) logger.trace { "Putting event data (Stream Name: #{stream_name}, Type: #{write_event.type}, Expected Version: #{expected_version.inspect})" } logger.trace(tags: [:data, :event_data]) { write_event.pretty_inspect } write_event.id ||= identifier.get id, type, data, = destructure_event(write_event) expected_version = ExpectedVersion.canonize(expected_version) insert_event(id, stream_name, type, data, , expected_version).tap do |position| logger.info { "Put event data (Position: #{position}, Stream Name: #{stream_name}, Type: #{write_event.type}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect})" } logger.info(tags: [:data, :event_data]) { write_event.pretty_inspect } end end |
#configure(session: nil) ⇒ Object
15 16 17 18 |
# File 'lib/event_source/postgres/put.rb', line 15 def configure(session: nil) Session.configure(self, session: session) Identifier::UUID::Random.configure(self) end |
#destructure_event(write_event) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/event_source/postgres/put.rb', line 46 def destructure_event(write_event) id = write_event.id type = write_event.type data = write_event.data = write_event. logger.debug(tags: [:data, :event_data]) { "ID: #{id.pretty_inspect}" } logger.debug(tags: [:data, :event_data]) { "Type: #{type.pretty_inspect}" } logger.debug(tags: [:data, :event_data]) { "Data: #{data.pretty_inspect}" } logger.debug(tags: [:data, :event_data]) { "Metadata: #{.pretty_inspect}" } return id, type, data, end |
#execute_query(id, stream_name, type, serialized_data, serialized_metadata, expected_version) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/event_source/postgres/put.rb', line 67 def execute_query(id, stream_name, type, serialized_data, , expected_version) logger.trace { "Executing insert (Stream Name: #{stream_name}, Type: #{type}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect})" } params = [ id, stream_name, type, serialized_data, , expected_version ] begin records = session.execute(self.class.statement, params) rescue PG::RaiseException => e raise_error e end logger.debug { "Executed insert (Stream Name: #{stream_name}, Type: #{type}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect})" } records end |
#insert_event(id, stream_name, type, data, metadata, expected_version) ⇒ Object
60 61 62 63 64 65 |
# File 'lib/event_source/postgres/put.rb', line 60 def insert_event(id, stream_name, type, data, , expected_version) serialized_data = serialized_data(data) = () records = execute_query(id, stream_name, type, serialized_data, , expected_version) position(records) end |
#position(records) ⇒ Object
126 127 128 129 130 131 132 |
# File 'lib/event_source/postgres/put.rb', line 126 def position(records) position = nil unless records[0].nil? position = records[0].values[0] end position end |
#raise_error(pg_error) ⇒ Object
134 135 136 137 138 139 140 141 142 |
# File 'lib/event_source/postgres/put.rb', line 134 def raise_error(pg_error) = pg_error. if .include? 'Wrong expected version' .gsub!('ERROR:', '').strip! logger.error { } raise ExpectedVersion::Error, end raise pg_error end |
#serialized_data(data) ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/event_source/postgres/put.rb', line 94 def serialized_data(data) serialized_data = nil if data.is_a?(Hash) && data.empty? data = nil end unless data.nil? serializable_data = EventData::Hash[data] serialized_data = Transform::Write.(serializable_data, :json) end logger.debug(tags: [:data, :serialize]) { "Serialized Data: #{serialized_data.inspect}" } serialized_data end |
#serialized_metadata(metadata) ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/event_source/postgres/put.rb', line 110 def () = nil if .is_a?(Hash) && .empty? = nil end unless .nil? = EventData::Hash[] = Transform::Write.(, :json) end logger.debug(tags: [:data, :serialize]) { "Serialized Metadata: #{.inspect}" } end |