Class: River::Driver::ActiveRecord

Inherits:
Object
  • Object
show all
Defined in:
lib/driver.rb

Overview

Provides a ActiveRecord driver for River.

Used in conjunction with a River client like:

DB = ActiveRecord.connect("postgres://...")
client = River::Client.new(River::Driver::ActiveRecord.new(DB))

Instance Method Summary collapse

Constructor Details

#initializeActiveRecord

Returns a new instance of ActiveRecord.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/driver.rb', line 10

def initialize
  # It's Ruby, so we can only define a model after ActiveRecord's established a
  # connection because it's all dynamic.
  if !River::Driver::ActiveRecord.const_defined?(:RiverJob)
    River::Driver::ActiveRecord.const_set(:RiverJob, Class.new(::ActiveRecord::Base) do
      self.table_name = "river_job"

      # Unfortunately, Rails errors if you have a column called `errors` and
      # provides no way to remap names (beyond ignoring a column, which we
      # really don't want). This patch is in place so we can hydrate this
      # model at all without ActiveRecord self-immolating.
      def self.dangerous_attribute_method?(method_name)
        return false if method_name == "errors"
        super
      end

      # See comment above, but since we force allowed `errors` as an
      # attribute name, ActiveRecord would otherwise fail to save a row as
      # it checked for its own `errors` hash and finding no values.
      def errors = {}
    end)
  end
end

Instance Method Details

#job_get_by_id(id) ⇒ Object



34
35
36
37
# File 'lib/driver.rb', line 34

def job_get_by_id(id)
  data_set = RiverJob.where(id: id)
  data_set.first ? to_job_row_from_model(data_set.first) : nil
end

#job_insert(insert_params) ⇒ Object



39
40
41
# File 'lib/driver.rb', line 39

def job_insert(insert_params)
  job_insert_many([insert_params]).first
end

#job_insert_many(insert_params_many) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/driver.rb', line 43

def job_insert_many(insert_params_many)
  res = RiverJob.upsert_all(
    insert_params_many.map { |param| insert_params_to_hash(param) },
    on_duplicate: Arel.sql("kind = EXCLUDED.kind"),
    returning: Arel.sql("*, (xmax != 0) AS unique_skipped_as_duplicate"),

    # It'd be nice to specify this as `(kind, unique_key) WHERE unique_key
    # IS NOT NULL` like we do elsewhere, but in its pure ingenuity, fucking
    # ActiveRecord tries to look up a unique index instead of letting
    # Postgres handle that, and of course it doesn't support a `WHERE`
    # clause. The workaround is to target the index name instead of columns.
    unique_by: "river_job_unique_idx"
  )
  to_insert_results(res)
end

#job_listObject



59
60
61
62
# File 'lib/driver.rb', line 59

def job_list
  data_set = RiverJob.order(:id)
  data_set.all.map { |job| to_job_row_from_model(job) }
end

#rollback_exceptionObject



64
65
66
# File 'lib/driver.rb', line 64

def rollback_exception
  ::ActiveRecord::Rollback
end

#transactionObject



68
69
70
# File 'lib/driver.rb', line 68

def transaction(&)
  ::ActiveRecord::Base.transaction(requires_new: true, &)
end