Class: ActiveRecordTransactioner

Inherits:
Object
  • Object
show all
Defined in:
lib/active-record-transactioner.rb

Constant Summary collapse

DEFAULT_ARGS =
{
  call_args: [],
  call_method: :save!,
  transaction_method: :transaction,
  transaction_size: 1000,
  threadded: false,
  max_running_threads: 2,
  debug: false
}
EMPTY_ARGS =
[]
ALLOWED_ARGS =
DEFAULT_ARGS.keys

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ ActiveRecordTransactioner

Returns a new instance of ActiveRecordTransactioner.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/active-record-transactioner.rb', line 18

def initialize(args = {})
  args.each_key { |key| raise "Invalid key: '#{key}'." unless ALLOWED_ARGS.include?(key) }

  @args = DEFAULT_ARGS.merge(args)
  parse_and_set_args

  return unless block_given?

  begin
    yield self
  ensure
    flush
    join if threadded?
  end
end

Instance Method Details

#bulk_create!(model) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/active-record-transactioner.rb', line 40

def bulk_create!(model)
  attributes = model.attributes
  attributes.delete("id")
  attributes.delete("created_at")
  attributes.delete("updated_at")

  klass = model.class
  @bulk_creates[klass] ||= []
  @bulk_creates[klass] << attributes

  @count += 1
end

#destroy!(model) ⇒ Object



61
62
63
# File 'lib/active-record-transactioner.rb', line 61

def destroy!(model)
  queue(model, type: :destroy!)
end

#flushObject

Flushes the specified method on all the queued models in a thread for each type of model.



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/active-record-transactioner.rb', line 91

def flush
  wait_for_threads if threadded?

  @lock.synchronize do
    @bulk_creates.each do |klass, attribute_array|
      if threadded?
        bulk_insert_attribute_array_threadded(klass, attribute_array)
      else
        bulk_insert_attribute_array(klass, attribute_array)
      end
    end

    @models.each do |klass, models|
      next if models.empty?

      @models[klass] = []
      @count -= models.length

      if threadded?
        work_threadded(klass, models)
      else
        work_models_through_transaction(klass, models)
      end
    end
  end
end

#joinObject

Waits for any remaining running threads.



119
120
121
122
123
124
# File 'lib/active-record-transactioner.rb', line 119

def join
  threads_to_join = @lock_threads.synchronize { @threads.clone }

  debug "Threads to join: #{threads_to_join}" if @debug
  threads_to_join.each(&:join)
end

#queue(model, args = {}) ⇒ Object

Adds another model to the queue and calls ‘flush’ if it is over the limit.



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/active-record-transactioner.rb', line 66

def queue(model, args = {})
  args[:type] ||= :save!

  @lock.synchronize do
    klass = model.class

    validate = args.key?(:validate) ? args[:validate] : true

    @lock_models[klass] ||= Monitor.new

    @models[klass] ||= []
    @models[klass] << {
      model: model,
      type: args.fetch(:type),
      validate: validate,
      method_args: args[:method_args] || EMPTY_ARGS
    }

    @count += 1
  end

  flush if should_flush?
end

#save!(model) ⇒ Object

Adds another model to the queue and calls ‘flush’ if it is over the limit.

Raises:

  • (ActiveRecord::RecordInvalid)


35
36
37
38
# File 'lib/active-record-transactioner.rb', line 35

def save!(model)
  raise ActiveRecord::RecordInvalid, model unless model.valid?
  queue(model, type: :save!, validate: false)
end

#threadded?Boolean

Returns:

  • (Boolean)


126
127
128
# File 'lib/active-record-transactioner.rb', line 126

def threadded?
  @args[:threadded]
end

#update_column(model, column_name, new_value) ⇒ Object



57
58
59
# File 'lib/active-record-transactioner.rb', line 57

def update_column(model, column_name, new_value)
  update_columns(model, column_name => new_value)
end

#update_columns(model, updates) ⇒ Object



53
54
55
# File 'lib/active-record-transactioner.rb', line 53

def update_columns(model, updates)
  queue(model, type: :update_columns, validate: false, method_args: [updates])
end