Class: MarjAdapter

Inherits:
Object
  • Object
show all
Defined in:
lib/marj_adapter.rb

Overview

ActiveJob queue adapter for Marj.

In addition to the standard ActiveJob queue adapter API, this adapter provides:

  • A query method which can be used to query enqueued jobs

  • A discard method which can be used to discard enqueued jobs.

  • A delete method which can be used to delete enqueued jobs.

Although it is possible to access the adapter directly in order to query, discard or delete, it is recommended to use the Marj module.

See github.com/nicholasdower/marj

Instance Method Summary collapse

Constructor Details

#initialize(record_class: 'Marj::Record', discard: proc { |job| delete(job) }) ⇒ MarjAdapter

Creates a new adapter which will enqueue jobs using the given ActiveRecord class.

Parameters:

  • record_class (Class, String) (defaults to: 'Marj::Record')

    the ActiveRecord class (or its name) to use, defaults to Marj::Record

  • discard (Proc) (defaults to: proc { |job| delete(job) })

    the proc to use to discard jobs, defaults to delegating to #delete



22
23
24
25
# File 'lib/marj_adapter.rb', line 22

def initialize(record_class: 'Marj::Record', discard: proc { |job| delete(job) })
  @record_class = record_class
  @discard_proc = discard
end

Instance Method Details

#delete(job) ⇒ ActiveJob::Base

Deletes the record associated with the specified job.

Returns:

  • (ActiveJob::Base)

    the deleted job



144
145
146
# File 'lib/marj_adapter.rb', line 144

def delete(job)
  job.tap { destroy_record(job) }
end

#discard(job, run_callbacks: true) ⇒ ActiveJob::Base

Discards the specified job.

Parameters:

  • job (ActiveJob::Base)

    the job being discarded

  • run_callbacks (Boolean) (defaults to: true)

    whether to run the after_discard callbacks

Returns:

  • (ActiveJob::Base)

    the discarded job



134
135
136
137
138
139
# File 'lib/marj_adapter.rb', line 134

def discard(job, run_callbacks: true)
  job.tap do
    @discard_proc.call(job)
    run_after_discard_callbacks(job) if run_callbacks
  end
end

#enqueue(job) ⇒ ActiveJob::Base

Enqueue a job for immediate execution.

Parameters:

  • job (ActiveJob::Base)

    the job to enqueue

Returns:

  • (ActiveJob::Base)

    the enqueued job



31
32
33
# File 'lib/marj_adapter.rb', line 31

def enqueue(job)
  enqueue_at(job)
end

#enqueue_at(job, timestamp = nil) ⇒ ActiveJob::Base

Enqueue a job for execution at the specified time.

Parameters:

  • job (ActiveJob::Base)

    the job to enqueue

  • timestamp (Numeric, NilClass) (defaults to: nil)

    optional number of seconds since Unix epoch at which to execute the job

Returns:

  • (ActiveJob::Base)

    the enqueued job



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/marj_adapter.rb', line 40

def enqueue_at(job, timestamp = nil)
  job.scheduled_at = timestamp ? Time.at(timestamp).utc : nil

  # Argument serialization is done by ActiveJob. ActiveRecord expects deserialized arguments.
  serialized = job.serialize.symbolize_keys!.without(:provider_job_id).merge(arguments: job.arguments)

  # Serialize sets locale to I18n.locale.to_s and enqueued_at to Time.now.utc.iso8601(9).
  # Update the job to reflect what is being enqueued.
  job.locale = serialized[:locale]
  job.enqueued_at = Time.iso8601(serialized[:enqueued_at]).utc

  # When a job is enqueued, we must create/update the corresponding database record. We also must ensure callbacks
  # are registered on the job instance so that when the job is executed, the database record is deleted or updated
  # (depending on the result).
  #
  # We keep track of whether callbacks have been registered by setting the @record instance variable on the job's
  # singleton class. This holds a reference to the record. This ensures that if execute is called on a record
  # instance, any updates to the database are reflected on that record instance.
  if (existing_record = job.singleton_class.instance_variable_get(:@record))
    # This job instance has already been associated with a database row.
    if record_class.exists?(job_id: job.job_id)
      # The database row still exists, we simply need to update it.
      existing_record.update!(serialized)
    else
      # Someone else deleted the database row, we need to recreate and reload the existing record instance. We don't
      # want to register the new instance because someone might still have a reference to the existing one.
      record_class.create!(serialized)
      existing_record.reload
    end
  else
    # This job instance has not been associated with a database row.
    if (new_record = record_class.find_by(job_id: job.job_id))
      # The database row already exists. Update it.
      new_record.update!(serialized)
    else
      # The database row does not exist. Create it.
      new_record = record_class.create!(serialized)
    end
    new_record.send(:register_callbacks, job)
  end
  job
end

#query(*args, **kwargs) ⇒ Object

Queries enqueued jobs. Similar to ActiveRecord.where with a few additional features:

  • Symbol arguments are treated as ActiveRecord scopes.

  • If only a job ID is specified, the corresponding job is returned.

  • If :limit is specified, the maximum number of jobs is limited.

  • If :order is specified, the jobs are ordered by the given attribute.

By default jobs are ordered by when they should be executed.

Example usage:

query                       # Returns all jobs
query(:all)                 # Returns all jobs
query(:due)                 # Returns jobs which are due to be executed
query(:due, limit: 10)      # Returns at most 10 jobs which are due to be executed
query(job_class: Foo)       # Returns all jobs with job_class Foo
query(:due, job_class: Foo) # Returns jobs which are due to be executed with job_class Foo
query(queue_name: 'foo')    # Returns all jobs in the 'foo' queue
query(job_id: '123')        # Returns the job with job_id '123' or nil if no such job exists
query('123')                # Returns the job with job_id '123' or nil if no such job exists


101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/marj_adapter.rb', line 101

def query(*args, **kwargs)
  args, kwargs = args.dup, kwargs.dup.symbolize_keys
  kwargs = kwargs.merge(job_id: kwargs.delete(:id)) if kwargs.key?(:id)
  kwargs[:job_id] = args.shift if args.size == 1 && args.first.is_a?(String) && args.first.match(JOB_ID_REGEX)

  if args.empty? && kwargs.size == 1 && kwargs.key?(:job_id)
    return record_class.find_by(job_id: kwargs[:job_id])&.to_job
  end

  symbol_args, args = args.partition { _1.is_a?(Symbol) }
  symbol_args.delete(:all)
  limit = kwargs.delete(:limit)
  relation = record_class.all
  relation = relation.order(kwargs.delete(:order)) if kwargs.key?(:order)
  relation = relation.where(*args, **kwargs) if args.any? || kwargs.any?
  relation = relation.limit(limit) if limit
  relation = relation.send(symbol_args.shift) while symbol_args.any?
  relation = relation.by_due_date if relation.is_a?(ActiveRecord::Relation) && relation.order_values.empty?

  if relation.is_a?(Enumerable)
    relation.map(&:to_job)
  elsif relation.is_a?(record_class)
    relation.to_job
  else
    relation
  end
end