Module: Sidekiq::AssuredJobs

Defined in:
lib/sidekiq-assured-jobs.rb,
lib/sidekiq/assured_jobs/web.rb,
lib/sidekiq/assured_jobs/worker.rb,
lib/sidekiq/assured_jobs/version.rb,
lib/sidekiq/assured_jobs/middleware.rb

Defined Under Namespace

Modules: Web, Worker Classes: Error, Middleware

Constant Summary collapse

VERSION =
"1.1.0"

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.auto_recovery_enabledObject

Returns the value of attribute auto_recovery_enabled.



26
27
28
# File 'lib/sidekiq-assured-jobs.rb', line 26

def auto_recovery_enabled
  @auto_recovery_enabled
end

.delayed_recovery_countObject

Returns the value of attribute delayed_recovery_count.



26
27
28
# File 'lib/sidekiq-assured-jobs.rb', line 26

def delayed_recovery_count
  @delayed_recovery_count
end

.delayed_recovery_intervalObject

Returns the value of attribute delayed_recovery_interval.



26
27
28
# File 'lib/sidekiq-assured-jobs.rb', line 26

def delayed_recovery_interval
  @delayed_recovery_interval
end

.heartbeat_intervalObject

Returns the value of attribute heartbeat_interval.



26
27
28
# File 'lib/sidekiq-assured-jobs.rb', line 26

def heartbeat_interval
  @heartbeat_interval
end

.heartbeat_ttlObject

Returns the value of attribute heartbeat_ttl.



26
27
28
# File 'lib/sidekiq-assured-jobs.rb', line 26

def heartbeat_ttl
  @heartbeat_ttl
end

.instance_idObject

Returns the value of attribute instance_id.



26
27
28
# File 'lib/sidekiq-assured-jobs.rb', line 26

def instance_id
  @instance_id
end

.loggerObject

Returns the value of attribute logger.



26
27
28
# File 'lib/sidekiq-assured-jobs.rb', line 26

def logger
  @logger
end

.namespaceObject

Returns the value of attribute namespace.



26
27
28
# File 'lib/sidekiq-assured-jobs.rb', line 26

def namespace
  @namespace
end

.recovery_lock_ttlObject

Returns the value of attribute recovery_lock_ttl.



26
27
28
# File 'lib/sidekiq-assured-jobs.rb', line 26

def recovery_lock_ttl
  @recovery_lock_ttl
end

.redis_optionsObject

Returns the value of attribute redis_options.



26
27
28
# File 'lib/sidekiq-assured-jobs.rb', line 26

def redis_options
  @redis_options
end

Class Method Details

.clear_unique_jobs_lock(job_data) ⇒ Object

Clear unique-jobs lock for orphaned jobs to allow immediate re-enqueuing



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/sidekiq-assured-jobs.rb', line 62

def clear_unique_jobs_lock(job_data)
  return unless job_data['unique_digest']

  begin
    # Check if SidekiqUniqueJobs is available
    if defined?(SidekiqUniqueJobs::Digests)
      SidekiqUniqueJobs::Digests.del(digest: job_data['unique_digest'])
      logger.info "AssuredJobs cleared unique-jobs lock for job #{job_data['jid']} with digest #{job_data['unique_digest']}"
    else
      logger.debug "AssuredJobs: SidekiqUniqueJobs not available, skipping lock cleanup for job #{job_data['jid']}"
    end
  rescue => e
    logger.warn "AssuredJobs failed to clear unique-jobs lock for job #{job_data['jid']}: #{e.message}"
  end
end

.configure {|_self| ... } ⇒ Object

Yields:

  • (_self)

Yield Parameters:



28
29
30
31
32
# File 'lib/sidekiq-assured-jobs.rb', line 28

def configure
  yield self if block_given?
  setup_defaults
  setup_sidekiq_hooks
end

.get_instances_statusObject



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/sidekiq-assured-jobs.rb', line 171

def get_instances_status
  instances = {}

  redis_sync do |conn|
    # Get live instances
    instance_keys = conn.keys(namespaced_key("instance:*"))
    instance_keys.each do |key|
      instance_id = key.split(":").last
      heartbeat = conn.get(key)
      instances[instance_id] = {
        status: 'alive',
        last_heartbeat: heartbeat ? Time.at(heartbeat.to_f) : nil
      }
    end

    # Get dead instances with orphaned jobs
    job_keys = conn.keys(namespaced_key("jobs:*"))
    job_keys.each do |job_key|
      instance_id = job_key.split(":").last
      unless instances[instance_id]
        instances[instance_id] = {
          status: 'dead',
          last_heartbeat: get_instance_last_heartbeat(instance_id, conn),
          orphaned_job_count: conn.scard(job_key)
        }
      end
    end
  end

  instances
end

.get_orphaned_job_by_jid(jid) ⇒ Object



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/sidekiq-assured-jobs.rb', line 203

def get_orphaned_job_by_jid(jid)
  redis_sync do |conn|
    job_data_key = namespaced_key("job:#{jid}")
    job_payload = conn.get(job_data_key)

    if job_payload
      job_data = JSON.parse(job_payload)

      # Find which instance this job belongs to
      job_keys = conn.keys(namespaced_key("jobs:*"))
      job_keys.each do |job_key|
        if conn.sismember(job_key, jid)
          instance_id = job_key.split(":").last
          job_data['instance_id'] = instance_id
          job_data['orphaned_at'] = get_instance_last_heartbeat(instance_id, conn)
          job_data['orphaned_duration'] = calculate_orphaned_duration(job_data['orphaned_at'])
          break
        end
      end

      job_data
    end
  end
end

.get_orphaned_jobs_infoObject

Web interface support methods



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/sidekiq-assured-jobs.rb', line 135

def get_orphaned_jobs_info
  orphaned_jobs = []

  redis_sync do |conn|
    # Get all job keys and instance keys
    job_keys = conn.keys(namespaced_key("jobs:*"))
    instance_keys = conn.keys(namespaced_key("instance:*"))

    # Extract live instance IDs
    live_instances = instance_keys.map { |key| key.split(":").last }.to_set

    job_keys.each do |job_key|
      instance_id = job_key.split(":").last
      unless live_instances.include?(instance_id)
        # Get all job IDs for this dead instance
        job_ids = conn.smembers(job_key)

        job_ids.each do |jid|
          job_data_key = namespaced_key("job:#{jid}")
          job_payload = conn.get(job_data_key)

          if job_payload
            job_data = JSON.parse(job_payload)
            job_data['instance_id'] = instance_id
            job_data['orphaned_at'] = get_instance_last_heartbeat(instance_id, conn)
            job_data['orphaned_duration'] = calculate_orphaned_duration(job_data['orphaned_at'])
            orphaned_jobs << job_data
          end
        end
      end
    end
  end

  orphaned_jobs.sort_by { |job| job['orphaned_at'] || 0 }.reverse
end

.namespaced_key(key) ⇒ Object

Helper method to add namespace prefix to Redis keys



57
58
59
# File 'lib/sidekiq-assured-jobs.rb', line 57

def namespaced_key(key)
  "#{namespace}:#{key}"
end

.redis(&block) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/sidekiq-assured-jobs.rb', line 34

def redis(&block)
  if redis_options
    # Use custom Redis configuration if provided
    redis_client = Redis.new(redis_options)
    if block_given?
      result = yield redis_client
      redis_client.close
      result
    else
      redis_client
    end
  else
    # Use Sidekiq's Redis connection pool
    Sidekiq.redis(&block)
  end
end

.redis_sync(&block) ⇒ Object



51
52
53
54
# File 'lib/sidekiq-assured-jobs.rb', line 51

def redis_sync(&block)
  # Synchronous Redis operations using Sidekiq's pool or custom config
  redis(&block)
end

.reenqueue_orphans!Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/sidekiq-assured-jobs.rb', line 78

def reenqueue_orphans!
  with_recovery_lock do
    logger.info "AssuredJobs starting orphan job recovery"

    redis_sync do |conn|
      # Get all job keys and instance keys (using custom namespacing)
      job_keys = conn.keys(namespaced_key("jobs:*"))
      instance_keys = conn.keys(namespaced_key("instance:*"))

      # Extract instance IDs from keys
      live_instances = instance_keys.map { |key| key.split(":").last }.to_set

      orphaned_jobs = []

      job_keys.each do |job_key|
        instance_id = job_key.split(":").last
        unless live_instances.include?(instance_id)
          # Get all job IDs for this dead instance
          job_ids = conn.smembers(job_key)

          job_ids.each do |jid|
            # Get the job payload
            job_data_key = namespaced_key("job:#{jid}")
            job_payload = conn.get(job_data_key)

            if job_payload
              orphaned_jobs << JSON.parse(job_payload)
              # Clean up the job data key
              conn.del(job_data_key)
            end
          end

          # Clean up the job tracking key
          conn.del(job_key)
        end
      end

      if orphaned_jobs.any?
        logger.info "AssuredJobs found #{orphaned_jobs.size} orphaned jobs, re-enqueuing"
        orphaned_jobs.each do |job_data|
          # Clear unique-jobs lock before re-enqueuing to avoid lock conflicts
          clear_unique_jobs_lock(job_data)

          Sidekiq::Client.push(job_data)
          logger.info "AssuredJobs re-enqueued job #{job_data['jid']} (#{job_data['class']})"
        end
      else
        logger.info "AssuredJobs found no orphaned jobs"
      end
    end
  end
rescue => e
  logger.error "AssuredJobs orphan recovery failed: #{e.message}"
  logger.error e.backtrace.join("\n")
end

.setup_sidekiq_hooksObject



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# File 'lib/sidekiq-assured-jobs.rb', line 228

def setup_sidekiq_hooks
  return unless defined?(Sidekiq::VERSION)

  Sidekiq.configure_server do |config|
    config.server_middleware do |chain|
      chain.add AssuredJobs::Middleware
    end

    # Add startup hook for heartbeat and orphan recovery
    config.on(:startup) do
      # Ensure configuration is set up
      setup_defaults unless @instance_id

      logger.info "AssuredJobs starting up on instance #{instance_id}"

      # Start heartbeat system
      setup_heartbeat

      # Run orphan recovery on startup only (if enabled)
      if auto_recovery_enabled
        Thread.new do
          sleep 5 # Give the server a moment to fully start
          begin
            reenqueue_orphans!
            spinup_delayed_recovery_thread
          rescue => e
            logger.error "AssuredJobs startup orphan recovery failed: #{e.message}"
            logger.error e.backtrace.join("\n")
          end
        end
      else
        logger.info "AssuredJobs auto-recovery is disabled"
      end
    end

    # Add shutdown hook to clean up
    config.on(:shutdown) do
      logger.info "AssuredJobs shutting down instance #{instance_id}"
      begin
        # Stop heartbeat thread
        if @heartbeat_thread&.alive?
          @heartbeat_thread.kill
          @heartbeat_thread = nil
        end

        redis_sync do |conn|
          # Only clean up instance heartbeat - let orphan recovery handle job cleanup
          # This ensures that if there are running jobs during shutdown, they will be
          # detected as orphaned and recovered by the next instance
          conn.del(namespaced_key("instance:#{instance_id}"))

          # Log tracked jobs but don't clean them up - they should be recovered as orphans
          job_tracking_key = namespaced_key("jobs:#{instance_id}")
          tracked_jobs = conn.smembers(job_tracking_key)

          if tracked_jobs.any?
            logger.warn "AssuredJobs leaving #{tracked_jobs.size} tracked jobs for orphan recovery: #{tracked_jobs.join(', ')}"
            logger.info "AssuredJobs: These jobs will be recovered by the next instance startup"
          else
            logger.info "AssuredJobs: No tracked jobs to leave for recovery"
          end
        end
      rescue => e
        logger.error "AssuredJobs shutdown cleanup failed: #{e.message}"
      end
    end
  end
end