Module: Gouda

Defined in:
lib/gouda.rb,
lib/gouda/bulk.rb,
lib/gouda/worker.rb,
lib/gouda/railtie.rb,
lib/gouda/version.rb,
lib/gouda/queue_constraints.rb,
lib/generators/gouda/install_generator.rb,
lib/gouda/active_job_extensions/interrupts.rb,
lib/gouda/active_job_extensions/concurrency.rb

Defined Under Namespace

Modules: ActiveJobExtensions, AnyQueue, BulkAdapterExtension, FiberDatabaseSupport, Scheduler Classes: Adapter, CombinedShutdownCheck, ConcurrencyExceededError, Configuration, EmptyQueueShutdownCheck, ExceptQueueConstraint, InstallGenerator, InterruptError, JobFuse, OnlyQueuesConstraint, Railtie, ThreadSafeSet, TimerShutdownCheck, TrapShutdownCheck, Worker, Workload

Constant Summary collapse

POLL_INTERVAL_DURATION_SECONDS =
1
UNINITIALISED_DATABASE_EXCEPTIONS =
[ActiveRecord::NoDatabaseError, ActiveRecord::StatementInvalid, ActiveRecord::ConnectionNotEstablished]
VERSION =
"0.2.0"

Class Method Summary collapse

Class Method Details

.configObject



74
75
76
# File 'lib/gouda.rb', line 74

def self.config
  @config ||= Configuration.new
end

.configure {|config| ... } ⇒ Object

Yields:



78
79
80
# File 'lib/gouda.rb', line 78

def self.configure
  yield config
end

.create_tables(active_record_schema) ⇒ Object



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/gouda.rb', line 119

def self.create_tables(active_record_schema)
  active_record_schema.create_enum :gouda_workload_state, %w[enqueued executing finished]
  active_record_schema.create_table :gouda_workloads, id: :uuid do |t|
    t.uuid :active_job_id, null: false
    t.timestamp :scheduled_at, null: false
    t.timestamp :execution_started_at
    t.timestamp :execution_finished_at
    t.timestamp :last_execution_heartbeat_at
    t.timestamp :interrupted_at, null: true

    t.string :scheduler_key, null: true
    t.string :queue_name, null: false, default: "default"
    t.integer :priority
    t.string :active_job_class_name, null: false
    t.jsonb :serialized_params
    t.jsonb :error, default: {}, null: false
    t.enum :state, enum_type: :gouda_workload_state, default: "enqueued", null: false
    t.string :execution_concurrency_key
    t.string :enqueue_concurrency_key
    t.string :executing_on
    t.integer :position_in_bulk

    t.timestamps
  end

  active_record_schema.add_index :gouda_workloads, [:priority, :id, :scheduled_at], where: "state = 'enqueued'", name: :gouda_checkout_all_index
  active_record_schema.add_index :gouda_workloads, [:id, :last_execution_heartbeat_at], where: "state = 'executing'", name: :gouda_last_heartbeat_index
  active_record_schema.add_index :gouda_workloads, [:enqueue_concurrency_key], where: "state = 'enqueued' AND enqueue_concurrency_key IS NOT NULL", unique: true, name: :guard_double_enqueue
  active_record_schema.add_index :gouda_workloads, [:scheduler_key], where: "state = 'enqueued' AND scheduler_key IS NOT NULL", unique: true, name: :guard_double_schedule
  active_record_schema.add_index :gouda_workloads, [:execution_concurrency_key], where: "state = 'executing' AND execution_concurrency_key IS NOT NULL", unique: true, name: :guard_double_exec
  active_record_schema.add_index :gouda_workloads, [:active_job_id], name: :same_job_display_idx
  active_record_schema.add_index :gouda_workloads, [:priority], order: {priority: "ASC NULLS LAST"}, name: :ordered_priority_idx
  active_record_schema.add_index :gouda_workloads, [:last_execution_heartbeat_at], name: :index_gouda_workloads_on_last_execution_heartbeat_at
  active_record_schema.add_index :gouda_workloads, [:scheduler_key], name: :index_gouda_workloads_on_scheduler_key

  active_record_schema.create_table :gouda_job_fuses, id: false do |t|
    t.string :active_job_class_name, null: false

    t.timestamps
  end
end

.enqueue_jobs_via_their_adapters(active_jobs) ⇒ Object

This method exists in edge Rails so probably can be replaced later: github.com/rails/rails/commit/9b62f88a2fde0d2bf8c4f6e3bcd06ecba7ca9d8d



34
35
36
37
38
39
40
41
42
43
# File 'lib/gouda/bulk.rb', line 34

def self.enqueue_jobs_via_their_adapters(active_jobs)
  jobs_per_adapter = active_jobs.compact.group_by { |aj| aj.class.queue_adapter }
  jobs_per_adapter.each_pair do |adapter, active_jobs|
    if adapter.respond_to?(:enqueue_all)
      adapter.enqueue_all(active_jobs)
    else
      active_jobs.each { |aj| adapter.enqueue(aj) }
    end
  end
end

.in_bulk(&blk) ⇒ Object

Inside this call, all ‘perform_later` calls on ActiveJob subclasses (including mailers) will be buffered. The call is reentrant, so you can have multiple `in_bulk` calls with arbitrary nesting. At the end of the block, the buffered jobs will be enqueued using their respective adapters. If an adapter supports `enqueue_all` (Sidekiq does in recent releases of Rails, for example), this functionality will be used. This method is especially useful when doing things such as mass-emails, or maintenance tasks where a large swath of jobs gets enqueued at once.

Examples:

Gouda.in_bulk do
  User.recently_joined.find_each do |recently_joined_user|
    GreetingJob.perform_later(recently_joined_user)
  end
end

Returns:

  • (Object)

    the return value of the block



20
21
22
23
24
25
26
27
28
29
30
# File 'lib/gouda/bulk.rb', line 20

def self.in_bulk(&blk)
  if Thread.current[:gouda_bulk_buffer].nil?
    Thread.current[:gouda_bulk_buffer] = []
    retval = yield
    buf, Thread.current[:gouda_bulk_buffer] = Thread.current[:gouda_bulk_buffer], nil
    enqueue_jobs_via_their_adapters(buf)
    retval
  else # There already is an open bulk
    yield
  end
end

.instrument(channel, options) ⇒ Object



115
116
117
# File 'lib/gouda.rb', line 115

def self.instrument(channel, options, &)
  ActiveSupport::Notifications.instrument("#{channel}.gouda", options, &)
end

.loggerObject



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/gouda.rb', line 82

def self.logger
  # By default, return a logger that sends data nowhere. The `Rails.logger` method
  # only becomes available later in the Rails lifecycle.
  @fallback_gouda_logger ||= ActiveSupport::TaggedLogging.new(Logger.new($stdout)).tap do |logger|
    logger.level = Logger::WARN
  end

  # We want the Rails-configured loggers to take precedence over ours, since Gouda
  # is just an ActiveJob adapter and the Workload is just an ActiveRecord, in the end.
  # So it should be up to the developer of the app, not to us, to set the logger up
  # and configure out. There are also gems such as "stackdriver" from Google which
  # rather unceremoniously overwrite the Rails logger with their own. If that happens,
  # it is the choice of the user to do so - and we should honor that choice. Same for
  # the logging level - the Rails logger level must take precendence. Same for logger
  # broadcasts which get set up, for example, by the Rails console when you start it.
  logger_to_use = Rails.try(:logger) || ActiveJob::Base.try(:logger) || @fallback_gouda_logger
  logger_to_use.tagged("Gouda") # So that we don't have to manually prefix/tag on every call
end

.parse_queue_constraint(queue_constraint_str) ⇒ Hash

Parse a string representing a group of queues into a queue constraint Note that this works similar to good_job. For example, the constraints do not necessarily compose all that well.

Examples:

Gouda::QueueConstraints.queue_parser('-queue1,queue2')
=> { exclude: [ 'queue1', 'queue2' ] }

Parameters:

  • queue_constraint_str (String)

    Queue string

Returns:

  • (Hash)

    How to match a given queue. It can have the following keys and values:

    • { all: true } indicates that all queues match.

    • { exclude: Array<String> } indicates the listed queue names should not match.

    • { include: Array<String> } indicates the listed queue names should match.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/gouda/queue_constraints.rb', line 56

def self.parse_queue_constraint(queue_constraint_str)
  string = queue_constraint_str.presence || "*"

  case string.first
  when "-"
    exclude_queues = true
    string = string[1..]
  when "+"
    string = string[1..]
  end

  queues = string.split(",").map(&:strip)

  if queues.include?("*")
    AnyQueue
  elsif exclude_queues
    ExceptQueueConstraint.new([queues])
  else
    OnlyQueuesConstraint.new([queues])
  end
end

.setup_fiber_environmentObject



161
162
163
164
# File 'lib/gouda.rb', line 161

def self.setup_fiber_environment
  # Check Rails isolation level configuration (non-destructive)
  Gouda::FiberDatabaseSupport.check_fiber_isolation_level
end

.startObject



47
48
49
# File 'lib/gouda.rb', line 47

def self.start
  start_with_scheduler_type
end

.start_with_scheduler_typeObject

Enhanced start method that chooses between thread and thread+fiber execution



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/gouda.rb', line 52

def self.start_with_scheduler_type
  queue_constraint = if ENV["GOUDA_QUEUES"]
    Gouda.parse_queue_constraint(ENV["GOUDA_QUEUES"])
  else
    Gouda::AnyQueue
  end

  logger.info("Gouda version: #{Gouda::VERSION}")

  # Determine execution parameters based on configuration
  use_fibers = Gouda.config.use_fiber_scheduler
  fibers_per_thread = use_fibers ? Gouda.config.fibers_per_thread : 1

  # Single worker loop call that handles both execution modes
  Worker.worker_loop(
    n_threads: Gouda.config.worker_thread_count,
    queue_constraint: queue_constraint,
    use_fibers: use_fibers,
    fibers_per_thread: fibers_per_thread
  )
end

.suppressing_sql_logsObject



101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/gouda.rb', line 101

def self.suppressing_sql_logs(&)
  # This is used for frequently-called methods that poll the DB. If logging is done at a low level (DEBUG)
  # those methods print a lot of SQL into the logs, on every poll. While that is useful if
  # you collect SQL queries from the logs, in most cases - especially if this is used
  # in a side-thread inside Puma - the output might be quite annoying. So silence the
  # logger when we poll, but just to INFO. Omitting DEBUG-level messages gets rid of the SQL.
  if Gouda::Workload.logger
    Gouda::Workload.logger.silence(Logger::INFO, &)
  else
    # In tests (and at earlier stages of the Rails boot cycle) the global ActiveRecord logger may be nil
    yield
  end
end

.worker_loop(n_threads:, check_shutdown: TrapShutdownCheck.new, queue_constraint: Gouda::AnyQueue, use_fibers: false, fibers_per_thread: 1) ⇒ Object

Module-level convenience method that delegates to Worker.worker_loop



273
274
275
276
277
278
279
280
281
# File 'lib/gouda/worker.rb', line 273

def self.worker_loop(n_threads:, check_shutdown: TrapShutdownCheck.new, queue_constraint: Gouda::AnyQueue, use_fibers: false, fibers_per_thread: 1)
  Worker.worker_loop(
    n_threads: n_threads,
    check_shutdown: check_shutdown,
    queue_constraint: queue_constraint,
    use_fibers: use_fibers,
    fibers_per_thread: fibers_per_thread
  )
end