Module: MessageStore::Postgres::Get
- Defined in:
- lib/message_store/postgres/get.rb,
lib/message_store/postgres/get/stream.rb,
lib/message_store/postgres/get/category.rb,
lib/message_store/postgres/get/stream/last.rb
Defined Under Namespace
Modules: BatchSize, Call, Defaults, Deserialize, SQLCommand, Time Classes: Category, Stream
Class Method Summary collapse
- .build(stream_name, batch_size: nil, session: nil, condition: nil) ⇒ Object
- .call(stream_name, position: nil, batch_size: nil, condition: nil, session: nil) ⇒ Object
- .configure(receiver, stream_name, attr_name: nil, batch_size: nil, condition: nil, session: nil) ⇒ Object
- .constrain_condition(condition) ⇒ Object
- .included(cls) ⇒ Object
- .specialization(stream_name) ⇒ Object
Instance Method Summary collapse
- #configure(session: nil) ⇒ Object
- #convert(result) ⇒ Object
- #get_result(stream_name, position) ⇒ Object
Class Method Details
.build(stream_name, batch_size: nil, session: nil, condition: nil) ⇒ Object
24 25 26 27 28 29 30 |
# File 'lib/message_store/postgres/get.rb', line 24 def self.build(stream_name, batch_size: nil, session: nil, condition: nil) cls = specialization(stream_name) cls.new(stream_name, batch_size, condition).tap do |instance| instance.configure(session: session) end end |
.call(stream_name, position: nil, batch_size: nil, condition: nil, session: nil) ⇒ Object
42 43 44 45 |
# File 'lib/message_store/postgres/get.rb', line 42 def self.call(stream_name, position: nil, batch_size: nil, condition: nil, session: nil) instance = build(stream_name, batch_size: batch_size, condition: condition, session: session) instance.(position) end |
.configure(receiver, stream_name, attr_name: nil, batch_size: nil, condition: nil, session: nil) ⇒ Object
32 33 34 35 36 |
# File 'lib/message_store/postgres/get.rb', line 32 def self.configure(receiver, stream_name, attr_name: nil, batch_size: nil, condition: nil, session: nil) attr_name ||= :get instance = build(stream_name, batch_size: batch_size, condition: condition, session: session) receiver.public_send "#{attr_name}=", instance end |
.constrain_condition(condition) ⇒ Object
85 86 87 88 89 |
# File 'lib/message_store/postgres/get.rb', line 85 def self.constrain_condition(condition) return nil if condition.nil? "(#{condition})" end |
.included(cls) ⇒ Object
4 5 6 7 8 9 10 11 12 13 14 15 16 |
# File 'lib/message_store/postgres/get.rb', line 4 def self.included(cls) cls.class_exec do include MessageStore::Get prepend Call prepend BatchSize extend SQLCommand dependency :session, Session initializer :stream_name, na(:batch_size), :condition end end |
.specialization(stream_name) ⇒ Object
114 115 116 117 118 119 120 |
# File 'lib/message_store/postgres/get.rb', line 114 def self.specialization(stream_name) if StreamName.category?(stream_name) Category else Stream end end |
Instance Method Details
#configure(session: nil) ⇒ Object
38 39 40 |
# File 'lib/message_store/postgres/get.rb', line 38 def configure(session: nil) Session.configure self, session: session end |
#convert(result) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/message_store/postgres/get.rb', line 98 def convert(result) logger.trace(tag: :get) { "Converting result to message data (Result Count: #{result.ntuples})" } = result.map do |record| record['data'] = Deserialize.data(record['data']) record['metadata'] = Deserialize.(record['metadata']) record['time'] = Time.utc_coerced(record['time']) MessageData::Read.build(record) end logger.debug(tag: :get) { "Converted result to message data (Message Data Count: #{.length})" } end |
#get_result(stream_name, position) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/message_store/postgres/get.rb', line 64 def get_result(stream_name, position) logger.trace(tag: :get) { "Getting result (Stream Name: #{stream_name}, Position: #{position.inspect}, Batch Size: #{batch_size.inspect}, Condition: #{condition || '(none)'})" } sql_command = self.class.sql_command(stream_name, position, batch_size, condition) cond = Get.constrain_condition(condition) params = [ stream_name, position, batch_size, cond ] result = session.execute(sql_command, params) logger.debug(tag: :get) { "Finished getting result (Count: #{result.ntuples}, Stream Name: #{stream_name}, Position: #{position.inspect}, Batch Size: #{batch_size.inspect}, Condition: #{condition || '(none)'})" } result end |