Module: NATS::JetStream::API
- Defined in:
- lib/nats/io/js.rb
Overview
JetStream::API are the types used to interact with the JetStream API.
Defined Under Namespace
Classes: ConsumerConfig, ConsumerInfo, RawStreamMsg, SequenceInfo, StreamConfig, StreamCreateResponse, StreamInfo, StreamState
Constant Summary collapse
- Error =
When the server responds with an error from the JetStream API.
::NATS::JetStream::Error::APIError
Instance Attribute Summary collapse
- #ack_floor ⇒ SequenceInfo
- #ack_policy ⇒ String
- #ack_wait ⇒ Integer
- #bytes ⇒ Integer
- #cluster ⇒ Hash
- #config ⇒ Hash
- #consumer_count ⇒ Integer
-
#consumer_seq ⇒ Integer
The consumer sequence.
- #created ⇒ String
- #deliver_policy ⇒ String
- #delivered ⇒ SequenceInfo
- #did_create ⇒ Boolean
- #discard ⇒ String
- #duplicate_window ⇒ Integer
- #durable_name ⇒ String
- #first_seq ⇒ Integer
- #last_seq ⇒ Integer
- #max_ack_pending ⇒ Integer
- #max_age ⇒ Integer
- #max_bytes ⇒ Integer
- #max_consumers ⇒ Integer
- #max_deliver ⇒ Integer
- #max_msg_size ⇒ Integer
- #max_msgs ⇒ Integer
- #max_msgs_per_subject ⇒ Integer
- #max_waiting ⇒ Integer
- #messages ⇒ Integer
- #name ⇒ String
- #num_ack_pending ⇒ Integer
- #num_pending ⇒ Integer
- #num_redelivered ⇒ Integer
- #num_replicas ⇒ Integer
- #num_waiting ⇒ Integer
- #replay_policy ⇒ String
- #retention ⇒ String
- #state ⇒ StreamState
- #storage ⇒ String
-
#stream_name ⇒ String
Name of the stream to which the consumer belongs.
-
#stream_seq ⇒ Integer
The stream sequence.
- #subjects ⇒ Array
- #type ⇒ String
Instance Attribute Details
#ack_floor ⇒ SequenceInfo
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 |
# File 'lib/nats/io/js.rb', line 1101 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000 opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#ack_policy ⇒ String
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 |
# File 'lib/nats/io/js.rb', line 1139 ConsumerConfig = Struct.new(:durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :memory_storage, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |
#ack_wait ⇒ Integer
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 |
# File 'lib/nats/io/js.rb', line 1139 ConsumerConfig = Struct.new(:durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :memory_storage, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |
#bytes ⇒ Integer
1269 1270 1271 1272 1273 1274 1275 1276 1277 |
# File 'lib/nats/io/js.rb', line 1269 StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts, :last_seq, :last_ts, :consumer_count, keyword_init: true) do def initialize(opts={}) rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#cluster ⇒ Hash
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 |
# File 'lib/nats/io/js.rb', line 1101 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000 opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#config ⇒ Hash
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 |
# File 'lib/nats/io/js.rb', line 1101 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000 opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#consumer_count ⇒ Integer
1269 1270 1271 1272 1273 1274 1275 1276 1277 |
# File 'lib/nats/io/js.rb', line 1269 StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts, :last_seq, :last_ts, :consumer_count, keyword_init: true) do def initialize(opts={}) rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#consumer_seq ⇒ Integer
Returns The consumer sequence.
1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 |
# File 'lib/nats/io/js.rb', line 1066 SequenceInfo = Struct.new(:consumer_seq, :stream_seq, :last_active, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields and freeze. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#created ⇒ String
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 |
# File 'lib/nats/io/js.rb', line 1101 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000 opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#deliver_policy ⇒ String
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 |
# File 'lib/nats/io/js.rb', line 1139 ConsumerConfig = Struct.new(:durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :memory_storage, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |
#delivered ⇒ SequenceInfo
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 |
# File 'lib/nats/io/js.rb', line 1101 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000 opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#did_create ⇒ Boolean
1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 |
# File 'lib/nats/io/js.rb', line 1210 StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :allow_direct, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |
#discard ⇒ String
1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 |
# File 'lib/nats/io/js.rb', line 1210 StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :allow_direct, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |
#duplicate_window ⇒ Integer
1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 |
# File 'lib/nats/io/js.rb', line 1210 StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :allow_direct, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |
#durable_name ⇒ String
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 |
# File 'lib/nats/io/js.rb', line 1139 ConsumerConfig = Struct.new(:durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :memory_storage, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |
#first_seq ⇒ Integer
1269 1270 1271 1272 1273 1274 1275 1276 1277 |
# File 'lib/nats/io/js.rb', line 1269 StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts, :last_seq, :last_ts, :consumer_count, keyword_init: true) do def initialize(opts={}) rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#last_seq ⇒ Integer
1269 1270 1271 1272 1273 1274 1275 1276 1277 |
# File 'lib/nats/io/js.rb', line 1269 StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts, :last_seq, :last_ts, :consumer_count, keyword_init: true) do def initialize(opts={}) rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#max_ack_pending ⇒ Integer
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 |
# File 'lib/nats/io/js.rb', line 1139 ConsumerConfig = Struct.new(:durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :memory_storage, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |
#max_age ⇒ Integer
1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 |
# File 'lib/nats/io/js.rb', line 1210 StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :allow_direct, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |
#max_bytes ⇒ Integer
1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 |
# File 'lib/nats/io/js.rb', line 1210 StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :allow_direct, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |
#max_consumers ⇒ Integer
1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 |
# File 'lib/nats/io/js.rb', line 1210 StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :allow_direct, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |
#max_deliver ⇒ Integer
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 |
# File 'lib/nats/io/js.rb', line 1139 ConsumerConfig = Struct.new(:durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :memory_storage, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |
#max_msg_size ⇒ Integer
1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 |
# File 'lib/nats/io/js.rb', line 1210 StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :allow_direct, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |
#max_msgs ⇒ Integer
1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 |
# File 'lib/nats/io/js.rb', line 1210 StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :allow_direct, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |
#max_msgs_per_subject ⇒ Integer
1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 |
# File 'lib/nats/io/js.rb', line 1210 StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :allow_direct, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |
#max_waiting ⇒ Integer
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 |
# File 'lib/nats/io/js.rb', line 1139 ConsumerConfig = Struct.new(:durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :memory_storage, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |
#messages ⇒ Integer
1269 1270 1271 1272 1273 1274 1275 1276 1277 |
# File 'lib/nats/io/js.rb', line 1269 StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts, :last_seq, :last_ts, :consumer_count, keyword_init: true) do def initialize(opts={}) rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#name ⇒ String
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 |
# File 'lib/nats/io/js.rb', line 1101 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000 opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#num_ack_pending ⇒ Integer
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 |
# File 'lib/nats/io/js.rb', line 1101 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000 opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#num_pending ⇒ Integer
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 |
# File 'lib/nats/io/js.rb', line 1101 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000 opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#num_redelivered ⇒ Integer
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 |
# File 'lib/nats/io/js.rb', line 1101 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000 opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#num_replicas ⇒ Integer
1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 |
# File 'lib/nats/io/js.rb', line 1210 StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :allow_direct, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |
#num_waiting ⇒ Integer
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 |
# File 'lib/nats/io/js.rb', line 1101 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000 opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#replay_policy ⇒ String
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 |
# File 'lib/nats/io/js.rb', line 1139 ConsumerConfig = Struct.new(:durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :memory_storage, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |
#retention ⇒ String
1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 |
# File 'lib/nats/io/js.rb', line 1210 StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :allow_direct, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |
#state ⇒ StreamState
1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 |
# File 'lib/nats/io/js.rb', line 1210 StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :allow_direct, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |
#storage ⇒ String
1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 |
# File 'lib/nats/io/js.rb', line 1210 StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :allow_direct, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |
#stream_name ⇒ String
Returns name of the stream to which the consumer belongs.
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 |
# File 'lib/nats/io/js.rb', line 1101 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000 opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#stream_seq ⇒ Integer
Returns The stream sequence.
1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 |
# File 'lib/nats/io/js.rb', line 1066 SequenceInfo = Struct.new(:consumer_seq, :stream_seq, :last_active, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields and freeze. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#subjects ⇒ Array
1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 |
# File 'lib/nats/io/js.rb', line 1210 StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :allow_direct, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |
#type ⇒ String
1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 |
# File 'lib/nats/io/js.rb', line 1210 StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :allow_direct, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end def to_json(*args) config = self.to_h config.delete_if { |_k, v| v.nil? } config.to_json(*args) end end |