Class: Deimos::Utils::DbPoller::TimeBased

Inherits:
Base
  • Object
show all
Defined in:
lib/deimos/utils/db_poller/time_based.rb

Overview

Poller that uses ID and updated_at to determine the records to publish.

Constant Summary

Constants inherited from Base

Base::BATCH_SIZE, Base::FATAL_CODES

Instance Attribute Summary

Attributes inherited from Base

#config, #id

Instance Method Summary collapse

Methods inherited from Base

#handle_message_too_large, #initialize, #log_identifier, #process_batch, #process_batch_with_span, #producer_classes, producers, #retrieve_poll_info, #should_run?, #start, #stop, #validate_producer_class

Constructor Details

This class inherits a constructor from Deimos::Utils::DbPoller::Base

Instance Method Details

#create_poll_infoObject

:nodoc:



12
13
14
15
16
17
# File 'lib/deimos/utils/db_poller/time_based.rb', line 12

def create_poll_info
  new_time = @config.start_from_beginning ? Time.new(0) : Time.zone.now
  Deimos::PollInfo.create!(producer: @resource_class.to_s,
                           last_sent: new_time,
                           last_sent_id: 0)
end

#fetch_results(time_from, time_to) ⇒ ActiveRecord::Relation



56
57
58
59
60
61
62
63
64
65
66
# File 'lib/deimos/utils/db_poller/time_based.rb', line 56

def fetch_results(time_from, time_to)
  id = self.producer_classes.first.record_class.primary_key
  quoted_timestamp = ActiveRecord::Base.connection.quote_column_name(@config.timestamp_column)
  quoted_id = ActiveRecord::Base.connection.quote_column_name(id)
  @resource_class.poll_query(time_from: time_from,
                             time_to: time_to,
                             column_name: @config.timestamp_column,
                             min_id: @info.last_sent_id).
    limit(BATCH_SIZE).
    order("#{quoted_timestamp}, #{quoted_id}")
end

#last_updated(record) ⇒ ActiveSupport::TimeWithZone



70
71
72
# File 'lib/deimos/utils/db_poller/time_based.rb', line 70

def last_updated(record)
  record.public_send(@config.timestamp_column)
end

#process_and_touch_info(batch, status) ⇒ Object



21
22
23
24
# File 'lib/deimos/utils/db_poller/time_based.rb', line 21

def process_and_touch_info(batch, status)
  process_batch_with_span(batch, status)
  self.touch_info(batch)
end

#process_updatesvoid

This method returns an undefined value.

Send messages for updated data.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/deimos/utils/db_poller/time_based.rb', line 28

def process_updates
  time_from = @config.full_table ? Time.new(0) : @info.last_sent.in_time_zone
  time_to = Time.zone.now - @config.delay_time
  Deimos::Logging.log_info("Polling #{log_identifier} from #{time_from} to #{time_to}")
  status = PollStatus.new(0, 0, 0)
  first_batch = true

  # poll_query gets all the relevant data from the database, as defined
  # by the producer itself.
  loop do
    Deimos::Logging.log_debug("Polling #{log_identifier}, batch #{status.current_batch}")
    batch = fetch_results(time_from, time_to).to_a
    break if batch.empty?

    first_batch = false
    process_and_touch_info(batch, status)
    time_from = last_updated(batch.last)
  end

  # If there were no results at all, we update last_sent so that we still get a wait
  # before the next poll.
  @info.touch(:last_sent) if first_batch
  Deimos::Logging.log_info("Poll #{log_identifier} complete at #{time_to} (#{status.report})")
end

#touch_info(batch) ⇒ void



76
77
78
79
80
81
82
83
# File 'lib/deimos/utils/db_poller/time_based.rb', line 76

def touch_info(batch)
  record = batch.last
  id_method = record.class.primary_key
  last_id = record.public_send(id_method)
  last_updated_at = last_updated(record)
  @info.attributes = { last_sent: last_updated_at, last_sent_id: last_id }
  @info.save!
end