Class: NATS::KeyValue

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/nats/io/kv.rb,
lib/nats/io/kv/api.rb,
lib/nats/io/kv/errors.rb,
lib/nats/io/kv/manager.rb,
lib/nats/io/kv/bucket_status.rb

Defined Under Namespace

Modules: API, Manager Classes: BadBucketError, BucketNotFoundError, BucketStatus, Entry, Error, InvalidKeyError, KeyDeletedError, KeyHistoryTooLargeError, KeyNotFoundError, KeyWrongLastSequenceError, NoKeysFoundError

Constant Summary collapse

KV_OP =
"KV-Operation"
KV_DEL =
"DEL"
KV_PURGE =
"PURGE"
MSG_ROLLUP_SUBJECT =
"sub"
MSG_ROLLUP_ALL =
"all"
ROLLUP =
"Nats-Rollup"
VALID_BUCKET_RE =
/\A[a-zA-Z0-9_-]+$/
VALID_KEY_RE =
/\A[-\/_=\.a-zA-Z0-9]+$/
EXPECTED_LAST_SUBJECT_SEQUENCE =
"Nats-Expected-Last-Subject-Sequence"
STATUS_HDR =
"Status"
DESC_HDR =
"Description"
CTRL_STATUS =
"100"
LAST_CONSUMER_SEQ_HDR =
"Nats-Last-Consumer"
LAST_STREAM_SEQ_HDR =
"Nats-Last-Stream"
CONSUMER_STALLED_HDR =
"Nats-Consumer-Stalled"

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ KeyValue

Returns a new instance of KeyValue.



50
51
52
53
54
55
56
57
# File 'lib/nats/io/kv.rb', line 50

def initialize(opts = {})
  @name = opts[:name]
  @stream = opts[:stream]
  @pre = opts[:pre]
  @js = opts[:js]
  @direct = opts[:direct]
  @validate_keys = opts[:validate_keys]
end

Class Method Details

.is_valid_key(key) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
# File 'lib/nats/io/kv.rb', line 37

def is_valid_key(key)
  if key.nil?
    false
  elsif key.start_with?(".") || key.end_with?(".")
    false
  elsif key !~ VALID_KEY_RE
    false
  else
    true
  end
end

Instance Method Details

#create(key, value) ⇒ Object

create will add the key/value pair iff it does not exist.

Raises:



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/nats/io/kv.rb', line 119

def create(key, value)
  raise InvalidKeyError if @validate_keys && !KeyValue.is_valid_key(key)

  pa = nil
  begin
    pa = update(key, value, last: 0)
  rescue KeyWrongLastSequenceError => err
    # In case of attempting to recreate an already deleted key,
    # the client would get a KeyWrongLastSequenceError.  When this happens,
    # it is needed to fetch latest revision number and attempt to update.
    begin
      # NOTE: This reimplements the following behavior from Go client.
      #
      #   Since we have tombstones for DEL ops for watchers, this could be from that
      #   so we need to double check.
      #
      _get(key)

      # No exception so not a deleted key, so reraise the original KeyWrongLastSequenceError.
      # If it was deleted then the error exception will contain metadata
      # to recreate using the last revision.
      raise err
    rescue KeyDeletedError => err
      pa = update(key, value, last: err.entry.revision)
    end
  end

  pa
end

#delete(key, params = {}) ⇒ Object

delete will place a delete marker and remove all previous revisions.

Raises:



173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/nats/io/kv.rb', line 173

def delete(key, params = {})
  raise InvalidKeyError if @validate_keys && !KeyValue.is_valid_key(key)

  hdrs = {}
  hdrs[KV_OP] = KV_DEL
  last = (params[:last] ||= 0)
  if last > 0
    hdrs[EXPECTED_LAST_SUBJECT_SEQUENCE] = last.to_s
  end
  ack = @js.publish("#{@pre}#{key}", header: hdrs)

  ack.seq
end

#get(key, params = {}) ⇒ Object

get returns the latest value for the key.

Raises:



60
61
62
63
64
65
66
67
68
69
70
# File 'lib/nats/io/kv.rb', line 60

def get(key, params = {})
  raise InvalidKeyError if @validate_keys && !KeyValue.is_valid_key(key)
  entry = nil
  begin
    entry = _get(key, params)
  rescue KeyDeletedError
    raise KeyNotFoundError
  end

  entry
end

#history(key, params = {}) ⇒ Object

history retrieves the entries so far for a key.



237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/nats/io/kv.rb', line 237

def history(key, params = {})
  params[:include_history] = true
  w = watch(key, params)
  got_keys = false

  Enumerator.new do |y|
    w.each do |entry|
      break if entry.nil?
      got_keys = true
      y << entry
    end
    w.stop
    raise NoKeysFoundError unless got_keys
  end
end

#keys(params = {}) ⇒ Object

keys returns the keys from a KeyValue store. Optionally filters the keys based on the provided filter list.



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/nats/io/kv.rb', line 218

def keys(params = {})
  params[:ignore_deletes] = true
  params[:meta_only] = true

  w = watchall(params)
  got_keys = false

  Enumerator.new do |y|
    w.each do |entry|
      break if entry.nil?
      got_keys = true
      y << entry.key
    end
    w.stop
    raise NoKeysFoundError unless got_keys
  end
end

#purge(key) ⇒ Object

purge will remove the key and all revisions.

Raises:



188
189
190
191
192
193
194
195
# File 'lib/nats/io/kv.rb', line 188

def purge(key)
  raise InvalidKeyError if @validate_keys && !KeyValue.is_valid_key(key)

  hdrs = {}
  hdrs[KV_OP] = KV_PURGE
  hdrs[ROLLUP] = MSG_ROLLUP_SUBJECT
  @js.publish("#{@pre}#{key}", header: hdrs)
end

#put(key, value) ⇒ Object

put will place the new value for the key into the store and return the revision number.

Raises:



111
112
113
114
115
116
# File 'lib/nats/io/kv.rb', line 111

def put(key, value)
  raise InvalidKeyError if @validate_keys && !KeyValue.is_valid_key(key)

  ack = @js.publish("#{@pre}#{key}", value)
  ack.seq
end

#statusObject

status retrieves the status and configuration of a bucket.



198
199
200
201
# File 'lib/nats/io/kv.rb', line 198

def status
  info = @js.stream_info(@stream)
  BucketStatus.new(info, @name)
end

#update(key, value, params = {}) ⇒ Object

update will update the value iff the latest revision matches.

Raises:



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/nats/io/kv.rb', line 152

def update(key, value, params = {})
  raise InvalidKeyError if @validate_keys && !KeyValue.is_valid_key(key)

  hdrs = {}
  last = (params[:last] ||= 0)
  hdrs[EXPECTED_LAST_SUBJECT_SEQUENCE] = last.to_s
  ack = nil
  begin
    ack = @js.publish("#{@pre}#{key}", value, header: hdrs)
  rescue NATS::JetStream::Error::APIError => err
    if err.err_code == 10071
      raise KeyWrongLastSequenceError.new(err.description)
    else
      raise err
    end
  end

  ack.seq
end

#watch(keys, params = {}) ⇒ Object

watch will be signaled when a key that matches the keys pattern is updated. The first update after starting the watch is nil in case there are no pending updates.



264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
# File 'lib/nats/io/kv.rb', line 264

def watch(keys, params = {})
  params[:meta_only] ||= false
  params[:include_history] ||= false
  params[:ignore_deletes] ||= false
  params[:idle_heartbeat] ||= 5 # seconds
  params[:inactive_threshold] ||= 5 * 60 # 5 minutes
  subject = "#{@pre}#{keys}"
  init_setup = new_cond
  init_setup_done = false
  nc = @js.nc
  watcher = KeyWatcher.new(@js)

  deliver_policy = if !(params[:include_history])
    "last_per_subject"
  end

  ordered = {
    # basic ordered consumer.
    flow_control: true,
    ack_policy: "none",
    max_deliver: 1,
    ack_wait: 22 * 3600,
    idle_heartbeat: params[:idle_heartbeat],
    num_replicas: 1,
    mem_storage: true,
    manual_ack: true,
    # watch related options.
    deliver_policy: deliver_policy,
    headers_only: params[:meta_only],
    inactive_threshold: params[:inactive_threshold]
  }

  # watch_updates callback.
  sub = @js.subscribe(subject, config: ordered) do |msg|
    synchronize do
      if !init_setup_done
        init_setup.wait(@js.opts[:timeout])
      end
    end

    # Control Message like Heartbeats and Flow Control
    status = msg.header[STATUS_HDR] unless msg.header.nil?
    if !status.nil? && status == CTRL_STATUS
      desc = msg.header[DESC_HDR]
      if desc.start_with?("Idle")
        # A watcher is active if it continues to receive Idle Heartbeat messages.
        #
        # Status: 100
        # Description: Idle Heartbeat
        # Nats-Last-Consumer: 185
        # Nats-Last-Stream: 185
        #
        watcher.synchronize { watcher._active = true }
      elsif desc.start_with?("FlowControl")
        # HMSG _INBOX.q6Y3JAFxOnNJi4QdwQnFtg 2 $JS.FC.KV_TEST.t00CunIG.GT4W 36 36
        # NATS/1.0 100 FlowControl Request
        nc.publish(msg.reply)
      end
      # Skip processing the control message
      next
    end

    # Track sequences
    meta = msg.
    watcher.synchronize { watcher._active = true }
    # Track the sequences
    #
    # $JS.ACK.KV_TEST.CKRGrWpf.1.10.10.1739859923871837000.0
    #
    tokens = msg.reply.split(".")
    sseq = tokens[5]
    dseq = tokens[6]
    watcher.synchronize do
      watcher._dseq = dseq.to_i + 1
      watcher._sseq = sseq.to_i
    end

    # Keys() handling
    op = nil
    if msg.header && msg.header[KV_OP]
      op = msg.header[KV_OP]
      if params[:ignore_deletes]
        if (op == KV_PURGE) || (op == KV_DEL)
          if (meta.num_pending == 0) && !watcher._init_done
            # Push this to unblock enumerators.
            watcher._updates.push(nil)
            watcher._init_done = true
          end
          next
        end
      end
    end

    # Convert the msg into an Entry.
    key = msg.subject[@pre.size...msg.subject.size]
    entry = Entry.new(
      bucket: @name,
      key: key,
      value: msg.data,
      revision: meta.sequence.stream,
      delta: meta.num_pending,
      created: meta.timestamp,
      operation: op
    )
    watcher._updates.push(entry)

    # When there are no more updates send an empty marker
    # to signal that it is done, this will unblock iterators.
    if (meta.num_pending == 0) && !watcher._init_done
      watcher._updates.push(nil)
      watcher._init_done = true
    end
  end # end of callback
  watcher._sub = sub

  # Snapshot the deliver subject for the consumer.
  deliver_subject = sub.subject

  # Check from consumer info what is the number of messages
  # awaiting to be consumed to send the initial signal marker.
  stream_name = nil
  begin
    cinfo = sub.consumer_info
    stream_name = cinfo.stream_name

    synchronize do
      init_setup_done = true
      # If no delivered and/or pending messages, then signal
      # that this is the start.
      # The consumer subscription will start receiving messages
      # so need to check those that have already made it.
      received = sub.delivered
      init_setup.signal

      # When there are no more updates send an empty marker
      # to signal that it is done, this will unblock iterators.
      if (cinfo.num_pending == 0) && (received == 0)
        watcher._updates.push(nil)
        watcher._init_done = true
      end
    end
  rescue => err
    # cancel init
    sub.unsubscribe
    raise err
  end

  # Need to handle reconnect if missing too many heartbeats.
  hb_interval = params[:idle_heartbeat] * 2
  watcher._hb_task = Concurrent::TimerTask.new(execution_interval: hb_interval) do |task|
    task.shutdown if nc.closed?
    next unless nc.connected?

    # Wait for all idle heartbeats to be received, one of them would have
    # toggled the state of the consumer back to being active.
    active = watcher.synchronize {
      current = watcher._active
      # A heartbeat or another incoming message needs to toggle back.
      watcher._active = false
      current
    }
    if !active
      ccreq = ordered.dup
      ccreq[:deliver_policy] = "by_start_sequence"
      ccreq[:opt_start_seq] = watcher._sseq
      ccreq[:deliver_subject] = deliver_subject
      ccreq[:idle_heartbeat] = ordered[:idle_heartbeat]
      ccreq[:inactive_threshold] = ordered[:inactive_threshold]

      should_recreate = false
      begin
        # Check if the original is still present, if it is then do not recreate.
        begin
          sub.consumer_info
        rescue ::NATS::JetStream::Error::ConsumerNotFound => e
          e.stream ||= sub.jsi.stream
          e.consumer ||= sub.jsi.consumer
          @js.nc.send(:err_cb_call, @js.nc, e, sub)
          should_recreate = true
        end
        next unless should_recreate

        # Recreate consumer that went away after a restart.
        cinfo = @js.add_consumer(stream_name, ccreq)
        sub.jsi.consumer = cinfo.name
        watcher.synchronize { watcher._dseq = 1 }
      rescue => e
        # Dispatch to the error NATS client error callback.
        @js.nc.send(:err_cb_call, @js.nc, e, sub)
      end
    end
  rescue => e
    # WRN: Unexpected error
    @js.nc.send(:err_cb_call, @js.nc, e, sub)
  end
  watcher._hb_task.execute

  watcher
end

#watchall(params = {}) ⇒ Object

watch will be signaled when any key is updated.



212
213
214
# File 'lib/nats/io/kv.rb', line 212

def watchall(params = {})
  watch(">", params)
end