Class: NATS::KeyValue

Inherits:
Object
  • Object
show all
Defined in:
lib/nats/io/kv.rb

Defined Under Namespace

Modules: API, Manager Classes: BadBucketError, BucketNotFoundError, BucketStatus, Entry, Error, KeyDeletedError, KeyNotFoundError, KeyWrongLastSequenceError

Constant Summary collapse

KV_OP =
"KV-Operation"
KV_DEL =
"DEL"
KV_PURGE =
"PURGE"
MSG_ROLLUP_SUBJECT =
"sub"
MSG_ROLLUP_ALL =
"all"
ROLLUP =
"Nats-Rollup"
EXPECTED_LAST_SUBJECT_SEQUENCE =
"Nats-Expected-Last-Subject-Sequence"

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ KeyValue

Returns a new instance of KeyValue.



25
26
27
28
29
30
31
# File 'lib/nats/io/kv.rb', line 25

def initialize(opts={})
  @name = opts[:name]
  @stream = opts[:stream]
  @pre = opts[:pre]
  @js = opts[:js]
  @direct = opts[:direct]
end

Instance Method Details

#create(key, value) ⇒ Object

create will add the key/value pair iff it does not exist.



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/nats/io/kv.rb', line 131

def create(key, value)
  pa = nil
  begin
    pa = update(key, value, last: 0)
  rescue KeyWrongLastSequenceError => err
    # In case of attempting to recreate an already deleted key,
    # the client would get a KeyWrongLastSequenceError.  When this happens,
    # it is needed to fetch latest revision number and attempt to update.
    begin
      # NOTE: This reimplements the following behavior from Go client.
      #
      #   Since we have tombstones for DEL ops for watchers, this could be from that
      #   so we need to double check.
      #
      _get(key)

      # No exception so not a deleted key, so reraise the original KeyWrongLastSequenceError.
      # If it was deleted then the error exception will contain metadata
      # to recreate using the last revision.
      raise err
    rescue KeyDeletedError => err
      pa = update(key, value, last: err.entry.revision)
    end
  end

  pa
end

#delete(key, params = {}) ⇒ Object

delete will place a delete marker and remove all previous revisions.



181
182
183
184
185
186
187
188
189
190
191
# File 'lib/nats/io/kv.rb', line 181

def delete(key, params={})
  hdrs = {}
  hdrs[KV_OP] = KV_DEL
  last = (params[:last] ||= 0)
  if last > 0
    hdrs[EXPECTED_LAST_SUBJECT_SEQUENCE] = last.to_s
  end
  ack = @js.publish("#{@pre}#{key}", header: hdrs)

  ack.seq
end

#get(key, params = {}) ⇒ Object

get returns the latest value for the key.



75
76
77
78
79
80
81
82
83
84
# File 'lib/nats/io/kv.rb', line 75

def get(key, params={})
  entry = nil
  begin
    entry = _get(key, params)
  rescue KeyDeletedError
    raise KeyNotFoundError
  end

  entry
end

#purge(key) ⇒ Object

purge will remove the key and all revisions.



194
195
196
197
198
199
# File 'lib/nats/io/kv.rb', line 194

def purge(key)
  hdrs = {}
  hdrs[KV_OP] = KV_PURGE
  hdrs[ROLLUP] = MSG_ROLLUP_SUBJECT
  @js.publish("#{@pre}#{key}", header: hdrs)
end

#put(key, value) ⇒ Object

put will place the new value for the key into the store and return the revision number.



125
126
127
128
# File 'lib/nats/io/kv.rb', line 125

def put(key, value)
  ack = @js.publish("#{@pre}#{key}", value)
  ack.seq
end

#statusObject

status retrieves the status and configuration of a bucket.



202
203
204
205
# File 'lib/nats/io/kv.rb', line 202

def status
  info = @js.stream_info(@stream)
  BucketStatus.new(info, @name)
end

#update(key, value, params = {}) ⇒ Object

update will update the value iff the latest revision matches.



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/nats/io/kv.rb', line 162

def update(key, value, params={})
  hdrs = {}
  last = (params[:last] ||= 0)
  hdrs[EXPECTED_LAST_SUBJECT_SEQUENCE] = last.to_s
  ack = nil
  begin
    ack = @js.publish("#{@pre}#{key}", value, header: hdrs)
  rescue NATS::JetStream::Error::APIError => err
    if err.err_code == 10071
      raise KeyWrongLastSequenceError.new(err.description)
    else
      raise err
    end
  end

  ack.seq
end