Module: MessageStore::Postgres::Get

Included in:
Category, Stream
Defined in:
lib/message_store/postgres/get.rb,
lib/message_store/postgres/get/stream.rb,
lib/message_store/postgres/get/category.rb,
lib/message_store/postgres/get/stream/last.rb

Defined Under Namespace

Modules: BatchSize, Call, Defaults, Deserialize, SQLCommand, Time Classes: Category, Stream

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.build(stream_name, batch_size: nil, session: nil, condition: nil) ⇒ Object



24
25
26
27
28
29
30
# File 'lib/message_store/postgres/get.rb', line 24

def self.build(stream_name, batch_size: nil, session: nil, condition: nil)
  cls = specialization(stream_name)

  cls.new(stream_name, batch_size, condition).tap do |instance|
    instance.configure(session: session)
  end
end

.call(stream_name, position: nil, batch_size: nil, condition: nil, session: nil) ⇒ Object



42
43
44
45
# File 'lib/message_store/postgres/get.rb', line 42

def self.call(stream_name, position: nil, batch_size: nil, condition: nil,  session: nil)
  instance = build(stream_name, batch_size: batch_size, condition: condition, session: session)
  instance.(position)
end

.configure(receiver, stream_name, attr_name: nil, batch_size: nil, condition: nil, session: nil) ⇒ Object



32
33
34
35
36
# File 'lib/message_store/postgres/get.rb', line 32

def self.configure(receiver, stream_name, attr_name: nil, batch_size: nil, condition: nil, session: nil)
  attr_name ||= :get
  instance = build(stream_name, batch_size: batch_size, condition: condition, session: session)
  receiver.public_send "#{attr_name}=", instance
end

.constrain_condition(condition) ⇒ Object



85
86
87
88
89
# File 'lib/message_store/postgres/get.rb', line 85

def self.constrain_condition(condition)
  return nil if condition.nil?

  "(#{condition})"
end

.included(cls) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
# File 'lib/message_store/postgres/get.rb', line 4

def self.included(cls)
  cls.class_exec do
    include MessageStore::Get
    prepend Call
    prepend BatchSize

    extend SQLCommand

    dependency :session, Session

    initializer :stream_name, na(:batch_size), :condition
  end
end

.specialization(stream_name) ⇒ Object



114
115
116
117
118
119
120
# File 'lib/message_store/postgres/get.rb', line 114

def self.specialization(stream_name)
  if StreamName.category?(stream_name)
    Category
  else
    Stream
  end
end

Instance Method Details

#configure(session: nil) ⇒ Object



38
39
40
# File 'lib/message_store/postgres/get.rb', line 38

def configure(session: nil)
  Session.configure self, session: session
end

#convert(result) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/message_store/postgres/get.rb', line 98

def convert(result)
  logger.trace(tag: :get) { "Converting result to message data (Result Count: #{result.ntuples})" }

  message_data = result.map do |record|
    record['data'] = Deserialize.data(record['data'])
    record['metadata'] = Deserialize.(record['metadata'])
    record['time'] = Time.utc_coerced(record['time'])

    MessageData::Read.build(record)
  end

  logger.debug(tag: :get) { "Converted result to message data (Message Data Count: #{message_data.length})" }

  message_data
end

#get_result(stream_name, position) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/message_store/postgres/get.rb', line 64

def get_result(stream_name, position)
  logger.trace(tag: :get) { "Getting result (Stream Name: #{stream_name}, Position: #{position.inspect}, Batch Size: #{batch_size.inspect}, Condition: #{condition || '(none)'})" }

  sql_command = self.class.sql_command(stream_name, position, batch_size, condition)

  cond = Get.constrain_condition(condition)

  params = [
    stream_name,
    position,
    batch_size,
    cond
  ]

  result = session.execute(sql_command, params)

  logger.debug(tag: :get) { "Finished getting result (Count: #{result.ntuples}, Stream Name: #{stream_name}, Position: #{position.inspect}, Batch Size: #{batch_size.inspect}, Condition: #{condition || '(none)'})" }

  result
end