Module: Deimos::KafkaSource::ClassMethods

Defined in:
lib/deimos/kafka_source.rb

Overview

:nodoc:

Instance Method Summary collapse

Instance Method Details

#import_without_validations_or_callbacks(column_names, array_of_attributes, options = {}) ⇒ Object

This is an internal method, part of the activerecord_import gem. It’s the one that actually does the importing, having already normalized the inputs (arrays, hashes, records etc.) Basically we want to first do the import, then reload the records and send them to Kafka.



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/deimos/kafka_source.rb', line 87

def import_without_validations_or_callbacks(column_names,
                                            array_of_attributes,
                                            options={})
  results = super
  return unless self.kafka_config[:import]
  return if array_of_attributes.empty?

  # This will contain an array of hashes, where each hash is the actual
  # attribute hash that created the object.
  ids = if results.is_a?(Array)
          results[1]
        elsif results.respond_to?(:ids)
          results.ids
        else
          []
        end
  if ids.blank?
    # re-fill IDs based on what was just entered into the DB.
    if self.connection.adapter_name.downcase =~ /sqlite/
      last_id = self.connection.select_value('select last_insert_rowid()')
      ids = ((last_id - array_of_attributes.size + 1)..last_id).to_a
    else # mysql
      last_id = self.connection.select_value('select LAST_INSERT_ID()')
      ids = (last_id..(last_id + array_of_attributes.size)).to_a
    end
  end
  array_of_hashes = []
  array_of_attributes.each_with_index do |array, i|
    hash = column_names.zip(array).to_h.with_indifferent_access
    hash[self.primary_key] = ids[i] if hash[self.primary_key].blank?
    array_of_hashes << hash
  end

  self.kafka_producers.each { |p| p.send_events(array_of_hashes) }
  results
end

#kafka_configHash

Returns:

  • (Hash)


56
57
58
59
60
61
62
63
# File 'lib/deimos/kafka_source.rb', line 56

def kafka_config
  {
    update: true,
    delete: true,
    import: true,
    create: true
  }
end

#kafka_producerDeimos::ActiveRecordProducer

Deprecated - use #kafka_producers instead.

Returns:

Raises:

  • (NotImplementedError)


75
76
77
78
79
80
# File 'lib/deimos/kafka_source.rb', line 75

def kafka_producer
  raise NotImplementedError if self.method(:kafka_producers).
    owner == Deimos::KafkaSource

  self.kafka_producers.first
end

#kafka_producersArray<Deimos::ActiveRecordProducer>

Returns the producers to run.

Returns:

Raises:

  • (NotImplementedError)


66
67
68
69
70
71
# File 'lib/deimos/kafka_source.rb', line 66

def kafka_producers
  raise NotImplementedError if self.method(:kafka_producer).
    owner == Deimos::KafkaSource

  [self.kafka_producer]
end