Class: Sidekiq::Batch

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/batch.rb,
lib/sidekiq/batch/status.rb,
lib/sidekiq/batch/version.rb,
lib/sidekiq/batch/callback.rb,
lib/sidekiq/batch/middleware.rb

Defined Under Namespace

Modules: Callback, Extension, Middleware Classes: NoBlockGivenError, Status

Constant Summary collapse

BID_EXPIRE_TTL =
2_592_000
VERSION =
'0.1.6'.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(existing_bid = nil) ⇒ Batch

Returns a new instance of Batch.



17
18
19
20
21
22
23
24
# File 'lib/sidekiq/batch.rb', line 17

def initialize(existing_bid = nil)
  @bid = existing_bid || SecureRandom.urlsafe_base64(10)
  @existing = !(!existing_bid || existing_bid.empty?)  # Basically existing_bid.present?
  @initialized = false
  @created_at = Time.now.utc.to_f
  @bidkey = "BID-" + @bid.to_s
  @ready_to_queue = []
end

Instance Attribute Details

#bidObject (readonly)

Returns the value of attribute bid.



15
16
17
# File 'lib/sidekiq/batch.rb', line 15

def bid
  @bid
end

#callback_queueObject

Returns the value of attribute callback_queue.



15
16
17
# File 'lib/sidekiq/batch.rb', line 15

def callback_queue
  @callback_queue
end

#created_atObject (readonly)

Returns the value of attribute created_at.



15
16
17
# File 'lib/sidekiq/batch.rb', line 15

def created_at
  @created_at
end

#descriptionObject

Returns the value of attribute description.



15
16
17
# File 'lib/sidekiq/batch.rb', line 15

def description
  @description
end

Class Method Details

.cleanup_redis(bid) ⇒ Object



262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/sidekiq/batch.rb', line 262

def cleanup_redis(bid)
  Sidekiq.logger.debug {"Cleaning redis of batch #{bid}"}
  Sidekiq.redis do |r|
    r.del(
      "BID-#{bid}",
      "BID-#{bid}-callbacks-complete",
      "BID-#{bid}-callbacks-success",
      "BID-#{bid}-failed",

      "BID-#{bid}-success",
      "BID-#{bid}-complete",
      "BID-#{bid}-jids",
    )
  end
end

.enqueue_callbacks(event, bid) ⇒ Object



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'lib/sidekiq/batch.rb', line 203

def enqueue_callbacks(event, bid)
  batch_key = "BID-#{bid}"
  callback_key = "#{batch_key}-callbacks-#{event}"
  already_processed, _, callbacks, queue, parent_bid, callback_batch = Sidekiq.redis do |r|
    r.multi do
      r.hget(batch_key, event)
      r.hset(batch_key, event, true)
      r.smembers(callback_key)
      r.hget(batch_key, "callback_queue")
      r.hget(batch_key, "parent_bid")
      r.hget(batch_key, "callback_batch")
    end
  end

  return if already_processed == 'true'

  queue ||= "default"
  parent_bid = !parent_bid || parent_bid.empty? ? nil : parent_bid    # Basically parent_bid.blank?
  callback_args = callbacks.reduce([]) do |memo, jcb|
    cb = Sidekiq.load_json(jcb)
    memo << [cb['callback'], event, cb['opts'], bid, parent_bid]
  end

  opts = {"bid" => bid, "event" => event}

  # Run callback batch finalize synchronously
  if callback_batch
    # Extract opts from cb_args or use current
    # Pass in stored event as callback finalize is processed on complete event
    cb_opts = callback_args.first&.at(2) || opts

    Sidekiq.logger.debug {"Run callback batch bid: #{bid} event: #{event} args: #{callback_args.inspect}"}
    # Finalize now
    finalizer = Sidekiq::Batch::Callback::Finalize.new
    status = Status.new bid
    finalizer.dispatch(status, cb_opts)

    return
  end

  Sidekiq.logger.debug {"Enqueue callback bid: #{bid} event: #{event} args: #{callback_args.inspect}"}

  if callback_args.empty?
    # Finalize now
    finalizer = Sidekiq::Batch::Callback::Finalize.new
    status = Status.new bid
    finalizer.dispatch(status, opts)
  else
    # Otherwise finalize in sub batch complete callback
    cb_batch = self.new
    cb_batch.callback_batch = true
    Sidekiq.logger.debug {"Adding callback batch: #{cb_batch.bid} for batch: #{bid}"}
    cb_batch.on(:complete, "Sidekiq::Batch::Callback::Finalize#dispatch", opts)
    cb_batch.jobs do
      push_callbacks callback_args, queue
    end
  end
end

.process_failed_job(bid, jid) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/sidekiq/batch.rb', line 149

def process_failed_job(bid, jid)
  _, pending, failed, children, complete, parent_bid = Sidekiq.redis do |r|
    r.multi do
      r.sadd("BID-#{bid}-failed", jid)

      r.hincrby("BID-#{bid}", "pending", 0)
      r.scard("BID-#{bid}-failed")
      r.hincrby("BID-#{bid}", "children", 0)
      r.scard("BID-#{bid}-complete")
      r.hget("BID-#{bid}", "parent_bid")

      r.expire("BID-#{bid}-failed", BID_EXPIRE_TTL)
    end
  end

  # if the batch failed, and has a parent, update the parent to show one pending and failed job
  if parent_bid
    Sidekiq.redis do |r|
      r.multi do
        r.hincrby("BID-#{parent_bid}", "pending", 1)
        r.sadd("BID-#{parent_bid}-failed", jid)
        r.expire("BID-#{parent_bid}-failed", BID_EXPIRE_TTL)
      end
    end
  end

  if pending.to_i == failed.to_i && children == complete
    enqueue_callbacks(:complete, bid)
  end
end

.process_successful_job(bid, jid) ⇒ Object



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/sidekiq/batch.rb', line 180

def process_successful_job(bid, jid)
  failed, pending, children, complete, success, total, parent_bid = Sidekiq.redis do |r|
    r.multi do
      r.scard("BID-#{bid}-failed")
      r.hincrby("BID-#{bid}", "pending", -1)
      r.hincrby("BID-#{bid}", "children", 0)
      r.scard("BID-#{bid}-complete")
      r.scard("BID-#{bid}-success")
      r.hget("BID-#{bid}", "total")
      r.hget("BID-#{bid}", "parent_bid")

      r.srem("BID-#{bid}-failed", jid)
      r.srem("BID-#{bid}-jids", jid)
      r.expire("BID-#{bid}", BID_EXPIRE_TTL)
    end
  end

  # if complete or successfull call complete callback (the complete callback may then call successful)
  if (pending.to_i == failed.to_i && children == complete) || (pending.to_i.zero? && children == success)
    enqueue_callbacks(:complete, bid)
  end
end

Instance Method Details

#callback_batch=(callback_batch) ⇒ Object



36
37
38
39
# File 'lib/sidekiq/batch.rb', line 36

def callback_batch=(callback_batch)
  @callback_batch = callback_batch
  persist_bid_attr('callback_batch', callback_batch)
end

#increment_job_queue(jid) ⇒ Object



110
111
112
# File 'lib/sidekiq/batch.rb', line 110

def increment_job_queue(jid)
  @ready_to_queue << jid
end

#invalidate_allObject



114
115
116
117
118
# File 'lib/sidekiq/batch.rb', line 114

def invalidate_all
  Sidekiq.redis do |r|
    r.setex("invalidated-bid-#{bid}", BID_EXPIRE_TTL, 1)
  end
end

#jobsObject

Raises:



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/sidekiq/batch.rb', line 55

def jobs
  raise NoBlockGivenError unless block_given?

  bid_data, Thread.current[:bid_data] = Thread.current[:bid_data], []

  begin
    if !@existing && !@initialized
      parent_bid = Thread.current[:batch].bid if Thread.current[:batch]

      Sidekiq.redis do |r|
        r.multi do
          r.hset(@bidkey, "created_at", @created_at)
          r.hset(@bidkey, "parent_bid", parent_bid.to_s) if parent_bid
          r.expire(@bidkey, BID_EXPIRE_TTL)
        end
      end

      @initialized = true
    end

    @ready_to_queue = []

    begin
      parent = Thread.current[:batch]
      Thread.current[:batch] = self
      yield
    ensure
      Thread.current[:batch] = parent
    end

    return [] if @ready_to_queue.size == 0

    Sidekiq.redis do |r|
      r.multi do
        if parent_bid
          r.hincrby("BID-#{parent_bid}", "children", 1)
          r.hincrby("BID-#{parent_bid}", "total", @ready_to_queue.size)
          r.expire("BID-#{parent_bid}", BID_EXPIRE_TTL)
        end

        r.hincrby(@bidkey, "pending", @ready_to_queue.size)
        r.hincrby(@bidkey, "total", @ready_to_queue.size)
        r.expire(@bidkey, BID_EXPIRE_TTL)

        r.sadd(@bidkey + "-jids", @ready_to_queue)
        r.expire(@bidkey + "-jids", BID_EXPIRE_TTL)
      end
    end

    @ready_to_queue
  ensure
    Thread.current[:bid_data] = bid_data
  end
end

#on(event, callback, options = {}) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/sidekiq/batch.rb', line 41

def on(event, callback, options = {})
  return unless %w(success complete).include?(event.to_s)
  callback_key = "#{@bidkey}-callbacks-#{event}"
  Sidekiq.redis do |r|
    r.multi do
      r.sadd(callback_key, JSON.unparse({
        callback: callback,
        opts: options
      }))
      r.expire(callback_key, BID_EXPIRE_TTL)
    end
  end
end

#parentObject



126
127
128
129
130
# File 'lib/sidekiq/batch.rb', line 126

def parent
  if parent_bid
    Sidekiq::Batch.new(parent_bid)
  end
end

#parent_bidObject



120
121
122
123
124
# File 'lib/sidekiq/batch.rb', line 120

def parent_bid
  Sidekiq.redis do |r|
    r.hget(@bidkey, "parent_bid")
  end
end

#valid?(batch = self) ⇒ Boolean

Returns:

  • (Boolean)


132
133
134
135
# File 'lib/sidekiq/batch.rb', line 132

def valid?(batch = self)
  valid = !Sidekiq.redis { |r| r.exists("invalidated-bid-#{batch.bid}") }
  batch.parent ? valid && valid?(batch.parent) : valid
end