Class: Chewy::Strategy::DelayedSidekiq::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/chewy/strategy/delayed_sidekiq/scheduler.rb

Constant Summary collapse

DEFAULT_TTL =

in seconds

60 * 60 * 24
DEFAULT_LATENCY =
10
DEFAULT_MARGIN =
2
DEFAULT_QUEUE =
'chewy'
KEY_PREFIX =
'chewy:delayed_sidekiq'
FALLBACK_FIELDS =
'all'
FIELDS_IDS_SEPARATOR =
';'
IDS_SEPARATOR =
','

Instance Method Summary collapse

Constructor Details

#initialize(type, ids, options = {}) ⇒ Scheduler

Returns a new instance of Scheduler.



25
26
27
28
29
# File 'lib/chewy/strategy/delayed_sidekiq/scheduler.rb', line 25

def initialize(type, ids, options = {})
  @type = type
  @ids = ids
  @options = options
end

Instance Method Details

#postponeObject

the diagram:

inputs: latency == 2 reindex_time = Time.current

Parallel OR Sequential triggers of reindex: | What is going on in reindex store (Redis):


                                                  |

process 1 (reindex_time): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1] Schedule.new(CitiesIndex, [1]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}] | & schedule a DelayedSidekiq::Worker at 1679347869 (at + 3) | it will zpop chewy:delayed_sidekiq:timechunks up to 1679347866 score and reindex all ids with zpoped keys | chewy:delayed_sidekiq:CitiesIndex:1679347866 | | process 2 (reindex_time): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2] Schedule.new(CitiesIndex, [2]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}] | & do not schedule a new worker | | process 1 (reindex_time + (latency - 1).seconds): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2, 3] Schedule.new(CitiesIndex, [3]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}] | & do not schedule a new worker | | process 2 (reindex_time + (latency + 1).seconds): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2, 3] Schedule.new(CitiesIndex, [4]).postpone | chewy:delayed_sidekiq:CitiesIndex:1679347868 = [4] | chewy:delayed_sidekiq:timechunks = [ | { score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"} | { score: 1679347868, "chewy:delayed_sidekiq:CitiesIndex:1679347868"} | ] | & schedule a DelayedSidekiq::Worker at 1679347871 (at + 3) | it will zpop chewy:delayed_sidekiq:timechunks up to 1679347868 score and reindex all ids with zpoped keys | chewy:delayed_sidekiq:CitiesIndex:1679347866 (in case of failed previous reindex), | chewy:delayed_sidekiq:CitiesIndex:1679347868



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/chewy/strategy/delayed_sidekiq/scheduler.rb', line 67

def postpone
  ::Sidekiq.redis do |redis|
    # warning: Redis#sadd will always return an Integer in Redis 5.0.0. Use Redis#sadd? instead
    if redis.respond_to?(:sadd?)
      redis.sadd?(timechunk_key, serialize_data)
    else
      redis.sadd(timechunk_key, serialize_data)
    end

    redis.expire(timechunk_key, ttl)

    unless redis.zrank(timechunks_key, timechunk_key)
      redis.zadd(timechunks_key, at, timechunk_key)
      redis.expire(timechunks_key, ttl)

      ::Sidekiq::Client.push(
        'queue' => sidekiq_queue,
        'at' => at + margin,
        'class' => Chewy::Strategy::DelayedSidekiq::Worker,
        'args' => [type_name, at]
      )
    end
  end
end