Module: Gouda
- Defined in:
- lib/gouda.rb,
lib/gouda/bulk.rb,
lib/gouda/worker.rb,
lib/gouda/railtie.rb,
lib/gouda/version.rb,
lib/gouda/queue_constraints.rb,
lib/generators/gouda/install_generator.rb,
lib/gouda/active_job_extensions/interrupts.rb,
lib/gouda/active_job_extensions/concurrency.rb
Defined Under Namespace
Modules: ActiveJobExtensions, AnyQueue, BulkAdapterExtension, FiberDatabaseSupport, Scheduler Classes: Adapter, CombinedShutdownCheck, ConcurrencyExceededError, Configuration, EmptyQueueShutdownCheck, ExceptQueueConstraint, InstallGenerator, InterruptError, JobFuse, OnlyQueuesConstraint, Railtie, ThreadSafeSet, TimerShutdownCheck, TrapShutdownCheck, Worker, Workload
Constant Summary collapse
- POLL_INTERVAL_DURATION_SECONDS =
1- UNINITIALISED_DATABASE_EXCEPTIONS =
[ActiveRecord::NoDatabaseError, ActiveRecord::StatementInvalid, ActiveRecord::ConnectionNotEstablished]
- VERSION =
"0.2.0"
Class Method Summary collapse
- .config ⇒ Object
- .configure {|config| ... } ⇒ Object
- .create_tables(active_record_schema) ⇒ Object
-
.enqueue_jobs_via_their_adapters(active_jobs) ⇒ Object
This method exists in edge Rails so probably can be replaced later: github.com/rails/rails/commit/9b62f88a2fde0d2bf8c4f6e3bcd06ecba7ca9d8d.
-
.in_bulk(&blk) ⇒ Object
Inside this call, all ‘perform_later` calls on ActiveJob subclasses (including mailers) will be buffered.
- .instrument(channel, options) ⇒ Object
- .logger ⇒ Object
-
.parse_queue_constraint(queue_constraint_str) ⇒ Hash
Parse a string representing a group of queues into a queue constraint Note that this works similar to good_job.
- .setup_fiber_environment ⇒ Object
- .start ⇒ Object
-
.start_with_scheduler_type ⇒ Object
Enhanced start method that chooses between thread and thread+fiber execution.
- .suppressing_sql_logs ⇒ Object
-
.worker_loop(n_threads:, check_shutdown: TrapShutdownCheck.new, queue_constraint: Gouda::AnyQueue, use_fibers: false, fibers_per_thread: 1) ⇒ Object
Module-level convenience method that delegates to Worker.worker_loop.
Class Method Details
.config ⇒ Object
74 75 76 |
# File 'lib/gouda.rb', line 74 def self.config @config ||= Configuration.new end |
.configure {|config| ... } ⇒ Object
78 79 80 |
# File 'lib/gouda.rb', line 78 def self.configure yield config end |
.create_tables(active_record_schema) ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/gouda.rb', line 119 def self.create_tables(active_record_schema) active_record_schema.create_enum :gouda_workload_state, %w[enqueued executing finished] active_record_schema.create_table :gouda_workloads, id: :uuid do |t| t.uuid :active_job_id, null: false t. :scheduled_at, null: false t. :execution_started_at t. :execution_finished_at t. :last_execution_heartbeat_at t. :interrupted_at, null: true t.string :scheduler_key, null: true t.string :queue_name, null: false, default: "default" t.integer :priority t.string :active_job_class_name, null: false t.jsonb :serialized_params t.jsonb :error, default: {}, null: false t.enum :state, enum_type: :gouda_workload_state, default: "enqueued", null: false t.string :execution_concurrency_key t.string :enqueue_concurrency_key t.string :executing_on t.integer :position_in_bulk t. end active_record_schema.add_index :gouda_workloads, [:priority, :id, :scheduled_at], where: "state = 'enqueued'", name: :gouda_checkout_all_index active_record_schema.add_index :gouda_workloads, [:id, :last_execution_heartbeat_at], where: "state = 'executing'", name: :gouda_last_heartbeat_index active_record_schema.add_index :gouda_workloads, [:enqueue_concurrency_key], where: "state = 'enqueued' AND enqueue_concurrency_key IS NOT NULL", unique: true, name: :guard_double_enqueue active_record_schema.add_index :gouda_workloads, [:scheduler_key], where: "state = 'enqueued' AND scheduler_key IS NOT NULL", unique: true, name: :guard_double_schedule active_record_schema.add_index :gouda_workloads, [:execution_concurrency_key], where: "state = 'executing' AND execution_concurrency_key IS NOT NULL", unique: true, name: :guard_double_exec active_record_schema.add_index :gouda_workloads, [:active_job_id], name: :same_job_display_idx active_record_schema.add_index :gouda_workloads, [:priority], order: {priority: "ASC NULLS LAST"}, name: :ordered_priority_idx active_record_schema.add_index :gouda_workloads, [:last_execution_heartbeat_at], name: :index_gouda_workloads_on_last_execution_heartbeat_at active_record_schema.add_index :gouda_workloads, [:scheduler_key], name: :index_gouda_workloads_on_scheduler_key active_record_schema.create_table :gouda_job_fuses, id: false do |t| t.string :active_job_class_name, null: false t. end end |
.enqueue_jobs_via_their_adapters(active_jobs) ⇒ Object
This method exists in edge Rails so probably can be replaced later: github.com/rails/rails/commit/9b62f88a2fde0d2bf8c4f6e3bcd06ecba7ca9d8d
34 35 36 37 38 39 40 41 42 43 |
# File 'lib/gouda/bulk.rb', line 34 def self.enqueue_jobs_via_their_adapters(active_jobs) jobs_per_adapter = active_jobs.compact.group_by { |aj| aj.class.queue_adapter } jobs_per_adapter.each_pair do |adapter, active_jobs| if adapter.respond_to?(:enqueue_all) adapter.enqueue_all(active_jobs) else active_jobs.each { |aj| adapter.enqueue(aj) } end end end |
.in_bulk(&blk) ⇒ Object
Inside this call, all ‘perform_later` calls on ActiveJob subclasses (including mailers) will be buffered. The call is reentrant, so you can have multiple `in_bulk` calls with arbitrary nesting. At the end of the block, the buffered jobs will be enqueued using their respective adapters. If an adapter supports `enqueue_all` (Sidekiq does in recent releases of Rails, for example), this functionality will be used. This method is especially useful when doing things such as mass-emails, or maintenance tasks where a large swath of jobs gets enqueued at once.
20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/gouda/bulk.rb', line 20 def self.in_bulk(&blk) if Thread.current[:gouda_bulk_buffer].nil? Thread.current[:gouda_bulk_buffer] = [] retval = yield buf, Thread.current[:gouda_bulk_buffer] = Thread.current[:gouda_bulk_buffer], nil enqueue_jobs_via_their_adapters(buf) retval else # There already is an open bulk yield end end |
.instrument(channel, options) ⇒ Object
115 116 117 |
# File 'lib/gouda.rb', line 115 def self.instrument(channel, , &) ActiveSupport::Notifications.instrument("#{channel}.gouda", , &) end |
.logger ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/gouda.rb', line 82 def self.logger # By default, return a logger that sends data nowhere. The `Rails.logger` method # only becomes available later in the Rails lifecycle. @fallback_gouda_logger ||= ActiveSupport::TaggedLogging.new(Logger.new($stdout)).tap do |logger| logger.level = Logger::WARN end # We want the Rails-configured loggers to take precedence over ours, since Gouda # is just an ActiveJob adapter and the Workload is just an ActiveRecord, in the end. # So it should be up to the developer of the app, not to us, to set the logger up # and configure out. There are also gems such as "stackdriver" from Google which # rather unceremoniously overwrite the Rails logger with their own. If that happens, # it is the choice of the user to do so - and we should honor that choice. Same for # the logging level - the Rails logger level must take precendence. Same for logger # broadcasts which get set up, for example, by the Rails console when you start it. logger_to_use = Rails.try(:logger) || ActiveJob::Base.try(:logger) || @fallback_gouda_logger logger_to_use.tagged("Gouda") # So that we don't have to manually prefix/tag on every call end |
.parse_queue_constraint(queue_constraint_str) ⇒ Hash
Parse a string representing a group of queues into a queue constraint Note that this works similar to good_job. For example, the constraints do not necessarily compose all that well.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/gouda/queue_constraints.rb', line 56 def self.parse_queue_constraint(queue_constraint_str) string = queue_constraint_str.presence || "*" case string.first when "-" exclude_queues = true string = string[1..] when "+" string = string[1..] end queues = string.split(",").map(&:strip) if queues.include?("*") AnyQueue elsif exclude_queues ExceptQueueConstraint.new([queues]) else OnlyQueuesConstraint.new([queues]) end end |
.setup_fiber_environment ⇒ Object
161 162 163 164 |
# File 'lib/gouda.rb', line 161 def self.setup_fiber_environment # Check Rails isolation level configuration (non-destructive) Gouda::FiberDatabaseSupport.check_fiber_isolation_level end |
.start ⇒ Object
47 48 49 |
# File 'lib/gouda.rb', line 47 def self.start start_with_scheduler_type end |
.start_with_scheduler_type ⇒ Object
Enhanced start method that chooses between thread and thread+fiber execution
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/gouda.rb', line 52 def self.start_with_scheduler_type queue_constraint = if ENV["GOUDA_QUEUES"] Gouda.parse_queue_constraint(ENV["GOUDA_QUEUES"]) else Gouda::AnyQueue end logger.info("Gouda version: #{Gouda::VERSION}") # Determine execution parameters based on configuration use_fibers = Gouda.config.use_fiber_scheduler fibers_per_thread = use_fibers ? Gouda.config.fibers_per_thread : 1 # Single worker loop call that handles both execution modes Worker.worker_loop( n_threads: Gouda.config.worker_thread_count, queue_constraint: queue_constraint, use_fibers: use_fibers, fibers_per_thread: fibers_per_thread ) end |
.suppressing_sql_logs ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/gouda.rb', line 101 def self.suppressing_sql_logs(&) # This is used for frequently-called methods that poll the DB. If logging is done at a low level (DEBUG) # those methods print a lot of SQL into the logs, on every poll. While that is useful if # you collect SQL queries from the logs, in most cases - especially if this is used # in a side-thread inside Puma - the output might be quite annoying. So silence the # logger when we poll, but just to INFO. Omitting DEBUG-level messages gets rid of the SQL. if Gouda::Workload.logger Gouda::Workload.logger.silence(Logger::INFO, &) else # In tests (and at earlier stages of the Rails boot cycle) the global ActiveRecord logger may be nil yield end end |
.worker_loop(n_threads:, check_shutdown: TrapShutdownCheck.new, queue_constraint: Gouda::AnyQueue, use_fibers: false, fibers_per_thread: 1) ⇒ Object
Module-level convenience method that delegates to Worker.worker_loop
273 274 275 276 277 278 279 280 281 |
# File 'lib/gouda/worker.rb', line 273 def self.worker_loop(n_threads:, check_shutdown: TrapShutdownCheck.new, queue_constraint: Gouda::AnyQueue, use_fibers: false, fibers_per_thread: 1) Worker.worker_loop( n_threads: n_threads, check_shutdown: check_shutdown, queue_constraint: queue_constraint, use_fibers: use_fibers, fibers_per_thread: fibers_per_thread ) end |