Class: EventSource::Postgres::Get

Inherits:
Object
  • Object
show all
Includes:
Get
Defined in:
lib/event_source/postgres/get.rb,
lib/event_source/postgres/get/last.rb,
lib/event_source/postgres/get/select_statement.rb,
lib/event_source/postgres/get/last/select_statement.rb

Defined Under Namespace

Modules: Deserialize, Time Classes: Last, SelectStatement

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.build(batch_size: nil, session: nil) ⇒ Object



10
11
12
13
14
# File 'lib/event_source/postgres/get.rb', line 10

def self.build(batch_size: nil, session: nil)
  new(batch_size).tap do |instance|
    instance.configure(session: session)
  end
end

.call(stream_name, position: nil, batch_size: nil, session: nil) ⇒ Object



26
27
28
29
# File 'lib/event_source/postgres/get.rb', line 26

def self.call(stream_name, position: nil, batch_size: nil, session: nil)
  instance = build(batch_size: batch_size, session: session)
  instance.(stream_name, position: position)
end

.configure(receiver, attr_name: nil, position: nil, batch_size: nil, session: nil) ⇒ Object



16
17
18
19
20
# File 'lib/event_source/postgres/get.rb', line 16

def self.configure(receiver, attr_name: nil, position: nil, batch_size: nil, session: nil)
  attr_name ||= :get
  instance = build(batch_size: batch_size, session: session)
  receiver.public_send "#{attr_name}=", instance
end

Instance Method Details

#call(stream_name, position: nil) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/event_source/postgres/get.rb', line 31

def call(stream_name, position: nil)
  logger.trace { "Getting event data (Position: #{position.inspect}, Stream Name: #{stream_name}, Batch Size: #{batch_size.inspect})" }

  records = get_records(stream_name, position)

  events = convert(records)

  logger.info { "Finished getting event data (Count: #{events.length}, Position: #{position.inspect}, Stream Name: #{stream_name}, Batch Size: #{batch_size.inspect})" }
  logger.info(tags: [:data, :event_data]) { events.pretty_inspect }

  events
end

#configure(session: nil) ⇒ Object



22
23
24
# File 'lib/event_source/postgres/get.rb', line 22

def configure(session: nil)
  Session.configure self, session: session
end

#convert(records) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/event_source/postgres/get.rb', line 56

def convert(records)
  logger.trace { "Converting records to event data (Records Count: #{records.ntuples})" }

  events = records.map do |record|
    record['data'] = Deserialize.data(record['data'])
    record['metadata'] = Deserialize.(record['metadata'])
    record['time'] = Time.utc_coerced(record['time'])

    EventData::Read.build record
  end

  logger.debug { "Converted records to event data (Event Data Count: #{events.length})" }

  events
end

#get_records(stream_name, position) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
# File 'lib/event_source/postgres/get.rb', line 44

def get_records(stream_name, position)
  logger.trace { "Getting records (Stream: #{stream_name}, Position: #{position.inspect}, Batch Size: #{batch_size.inspect})" }

  select_statement = SelectStatement.build(stream_name, position: position, batch_size: batch_size)

  records = session.execute(select_statement.sql)

  logger.debug { "Finished getting records (Count: #{records.ntuples}, Stream: #{stream_name}, Position: #{position.inspect}, Batch Size: #{batch_size.inspect})" }

  records
end