Class: NATS::KeyValue
- Inherits:
-
Object
- Object
- NATS::KeyValue
- Defined in:
- lib/nats/io/kv.rb
Defined Under Namespace
Modules: API, Manager Classes: BadBucketError, BucketNotFoundError, BucketStatus, Entry, Error, KeyDeletedError, KeyNotFoundError, KeyWrongLastSequenceError
Constant Summary collapse
- KV_OP =
"KV-Operation"
- KV_DEL =
"DEL"
- KV_PURGE =
"PURGE"
- MSG_ROLLUP_SUBJECT =
"sub"
- MSG_ROLLUP_ALL =
"all"
- ROLLUP =
"Nats-Rollup"
- EXPECTED_LAST_SUBJECT_SEQUENCE =
"Nats-Expected-Last-Subject-Sequence"
Instance Method Summary collapse
-
#create(key, value) ⇒ Object
create will add the key/value pair iff it does not exist.
-
#delete(key, params = {}) ⇒ Object
delete will place a delete marker and remove all previous revisions.
-
#get(key, params = {}) ⇒ Object
get returns the latest value for the key.
-
#initialize(opts = {}) ⇒ KeyValue
constructor
A new instance of KeyValue.
-
#purge(key) ⇒ Object
purge will remove the key and all revisions.
-
#put(key, value) ⇒ Object
put will place the new value for the key into the store and return the revision number.
-
#status ⇒ Object
status retrieves the status and configuration of a bucket.
-
#update(key, value, params = {}) ⇒ Object
update will update the value iff the latest revision matches.
Constructor Details
#initialize(opts = {}) ⇒ KeyValue
Returns a new instance of KeyValue.
25 26 27 28 29 30 31 |
# File 'lib/nats/io/kv.rb', line 25 def initialize(opts={}) @name = opts[:name] @stream = opts[:stream] @pre = opts[:pre] @js = opts[:js] @direct = opts[:direct] end |
Instance Method Details
#create(key, value) ⇒ Object
create will add the key/value pair iff it does not exist.
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/nats/io/kv.rb', line 131 def create(key, value) pa = nil begin pa = update(key, value, last: 0) rescue KeyWrongLastSequenceError => err # In case of attempting to recreate an already deleted key, # the client would get a KeyWrongLastSequenceError. When this happens, # it is needed to fetch latest revision number and attempt to update. begin # NOTE: This reimplements the following behavior from Go client. # # Since we have tombstones for DEL ops for watchers, this could be from that # so we need to double check. # _get(key) # No exception so not a deleted key, so reraise the original KeyWrongLastSequenceError. # If it was deleted then the error exception will contain metadata # to recreate using the last revision. raise err rescue KeyDeletedError => err pa = update(key, value, last: err.entry.revision) end end pa end |
#delete(key, params = {}) ⇒ Object
delete will place a delete marker and remove all previous revisions.
181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/nats/io/kv.rb', line 181 def delete(key, params={}) hdrs = {} hdrs[KV_OP] = KV_DEL last = (params[:last] ||= 0) if last > 0 hdrs[EXPECTED_LAST_SUBJECT_SEQUENCE] = last.to_s end ack = @js.publish("#{@pre}#{key}", header: hdrs) ack.seq end |
#get(key, params = {}) ⇒ Object
get returns the latest value for the key.
75 76 77 78 79 80 81 82 83 84 |
# File 'lib/nats/io/kv.rb', line 75 def get(key, params={}) entry = nil begin entry = _get(key, params) rescue KeyDeletedError raise KeyNotFoundError end entry end |
#purge(key) ⇒ Object
purge will remove the key and all revisions.
194 195 196 197 198 199 |
# File 'lib/nats/io/kv.rb', line 194 def purge(key) hdrs = {} hdrs[KV_OP] = KV_PURGE hdrs[ROLLUP] = MSG_ROLLUP_SUBJECT @js.publish("#{@pre}#{key}", header: hdrs) end |
#put(key, value) ⇒ Object
put will place the new value for the key into the store and return the revision number.
125 126 127 128 |
# File 'lib/nats/io/kv.rb', line 125 def put(key, value) ack = @js.publish("#{@pre}#{key}", value) ack.seq end |
#status ⇒ Object
status retrieves the status and configuration of a bucket.
202 203 204 205 |
# File 'lib/nats/io/kv.rb', line 202 def status info = @js.stream_info(@stream) BucketStatus.new(info, @name) end |
#update(key, value, params = {}) ⇒ Object
update will update the value iff the latest revision matches.
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/nats/io/kv.rb', line 162 def update(key, value, params={}) hdrs = {} last = (params[:last] ||= 0) hdrs[EXPECTED_LAST_SUBJECT_SEQUENCE] = last.to_s ack = nil begin ack = @js.publish("#{@pre}#{key}", value, header: hdrs) rescue NATS::JetStream::Error::APIError => err if err.err_code == 10071 raise KeyWrongLastSequenceError.new(err.description) else raise err end end ack.seq end |