Module: NATS::JetStream::API
- Defined in:
- lib/nats/io/jetstream/api.rb
Overview
JetStream::API are the types used to interact with the JetStream API.
Defined Under Namespace
Classes: ConsumerConfig, ConsumerInfo, RawStreamMsg, SequenceInfo, StreamConfig, StreamCreateResponse, StreamInfo, StreamState
Constant Summary collapse
- Error =
When the server responds with an error from the JetStream API.
::NATS::JetStream::Error::APIError
Instance Attribute Summary collapse
- #ack_floor ⇒ SequenceInfo
- #ack_policy ⇒ String
- #ack_wait ⇒ Integer
- #bytes ⇒ Integer
- #cluster ⇒ Hash
- #config ⇒ Hash
- #consumer_count ⇒ Integer
-
#consumer_seq ⇒ Integer
The consumer sequence.
- #created ⇒ String
- #deliver_policy ⇒ String
- #delivered ⇒ SequenceInfo
- #did_create ⇒ Boolean
- #discard ⇒ String
- #duplicate_window ⇒ Integer
- #durable_name ⇒ String
- #first_seq ⇒ Integer
- #last_seq ⇒ Integer
- #max_ack_pending ⇒ Integer
- #max_age ⇒ Integer
- #max_bytes ⇒ Integer
- #max_consumers ⇒ Integer
- #max_deliver ⇒ Integer
- #max_msg_size ⇒ Integer
- #max_msgs ⇒ Integer
- #max_msgs_per_subject ⇒ Integer
- #max_waiting ⇒ Integer
- #messages ⇒ Integer
- #name ⇒ String
- #num_ack_pending ⇒ Integer
- #num_pending ⇒ Integer
- #num_redelivered ⇒ Integer
- #num_replicas ⇒ Integer
- #num_waiting ⇒ Integer
- #replay_policy ⇒ String
- #retention ⇒ String
- #state ⇒ StreamState
- #storage ⇒ String
-
#stream_name ⇒ String
Name of the stream to which the consumer belongs.
-
#stream_seq ⇒ Integer
The stream sequence.
- #subjects ⇒ Array
- #type ⇒ String
Instance Attribute Details
#ack_floor ⇒ SequenceInfo
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/nats/io/jetstream/api.rb', line 68 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#ack_policy ⇒ String
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/nats/io/jetstream/api.rb', line 108 ConsumerConfig = Struct.new(:name, :durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :mem_storage, # NATS v2.10 features :metadata, :filter_subjects, :max_bytes, keyword_init: true) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#ack_wait ⇒ Integer
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/nats/io/jetstream/api.rb', line 108 ConsumerConfig = Struct.new(:name, :durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :mem_storage, # NATS v2.10 features :metadata, :filter_subjects, :max_bytes, keyword_init: true) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#bytes ⇒ Integer
250 251 252 253 254 255 256 257 258 |
# File 'lib/nats/io/jetstream/api.rb', line 250 StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts, :last_seq, :last_ts, :consumer_count, keyword_init: true) do def initialize(opts = {}) rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#cluster ⇒ Hash
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/nats/io/jetstream/api.rb', line 68 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#config ⇒ Hash
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/nats/io/jetstream/api.rb', line 68 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#consumer_count ⇒ Integer
250 251 252 253 254 255 256 257 258 |
# File 'lib/nats/io/jetstream/api.rb', line 250 StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts, :last_seq, :last_ts, :consumer_count, keyword_init: true) do def initialize(opts = {}) rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#consumer_seq ⇒ Integer
Returns The consumer sequence.
33 34 35 36 37 38 39 40 41 42 |
# File 'lib/nats/io/jetstream/api.rb', line 33 SequenceInfo = Struct.new(:consumer_seq, :stream_seq, :last_active, keyword_init: true) do def initialize(opts = {}) # Filter unrecognized fields and freeze. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#created ⇒ String
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/nats/io/jetstream/api.rb', line 68 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#deliver_policy ⇒ String
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/nats/io/jetstream/api.rb', line 108 ConsumerConfig = Struct.new(:name, :durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :mem_storage, # NATS v2.10 features :metadata, :filter_subjects, :max_bytes, keyword_init: true) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#delivered ⇒ SequenceInfo
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/nats/io/jetstream/api.rb', line 68 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#did_create ⇒ Boolean
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/nats/io/jetstream/api.rb', line 174 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true ) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#discard ⇒ String
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/nats/io/jetstream/api.rb', line 174 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true ) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#duplicate_window ⇒ Integer
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/nats/io/jetstream/api.rb', line 174 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true ) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#durable_name ⇒ String
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/nats/io/jetstream/api.rb', line 108 ConsumerConfig = Struct.new(:name, :durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :mem_storage, # NATS v2.10 features :metadata, :filter_subjects, :max_bytes, keyword_init: true) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#first_seq ⇒ Integer
250 251 252 253 254 255 256 257 258 |
# File 'lib/nats/io/jetstream/api.rb', line 250 StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts, :last_seq, :last_ts, :consumer_count, keyword_init: true) do def initialize(opts = {}) rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#last_seq ⇒ Integer
250 251 252 253 254 255 256 257 258 |
# File 'lib/nats/io/jetstream/api.rb', line 250 StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts, :last_seq, :last_ts, :consumer_count, keyword_init: true) do def initialize(opts = {}) rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#max_ack_pending ⇒ Integer
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/nats/io/jetstream/api.rb', line 108 ConsumerConfig = Struct.new(:name, :durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :mem_storage, # NATS v2.10 features :metadata, :filter_subjects, :max_bytes, keyword_init: true) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#max_age ⇒ Integer
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/nats/io/jetstream/api.rb', line 174 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true ) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#max_bytes ⇒ Integer
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/nats/io/jetstream/api.rb', line 174 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true ) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#max_consumers ⇒ Integer
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/nats/io/jetstream/api.rb', line 174 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true ) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#max_deliver ⇒ Integer
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/nats/io/jetstream/api.rb', line 108 ConsumerConfig = Struct.new(:name, :durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :mem_storage, # NATS v2.10 features :metadata, :filter_subjects, :max_bytes, keyword_init: true) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#max_msg_size ⇒ Integer
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/nats/io/jetstream/api.rb', line 174 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true ) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#max_msgs ⇒ Integer
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/nats/io/jetstream/api.rb', line 174 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true ) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#max_msgs_per_subject ⇒ Integer
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/nats/io/jetstream/api.rb', line 174 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true ) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#max_waiting ⇒ Integer
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/nats/io/jetstream/api.rb', line 108 ConsumerConfig = Struct.new(:name, :durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :mem_storage, # NATS v2.10 features :metadata, :filter_subjects, :max_bytes, keyword_init: true) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#messages ⇒ Integer
250 251 252 253 254 255 256 257 258 |
# File 'lib/nats/io/jetstream/api.rb', line 250 StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts, :last_seq, :last_ts, :consumer_count, keyword_init: true) do def initialize(opts = {}) rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#name ⇒ String
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/nats/io/jetstream/api.rb', line 68 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#num_ack_pending ⇒ Integer
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/nats/io/jetstream/api.rb', line 68 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#num_pending ⇒ Integer
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/nats/io/jetstream/api.rb', line 68 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#num_redelivered ⇒ Integer
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/nats/io/jetstream/api.rb', line 68 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#num_replicas ⇒ Integer
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/nats/io/jetstream/api.rb', line 174 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true ) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#num_waiting ⇒ Integer
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/nats/io/jetstream/api.rb', line 68 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#replay_policy ⇒ String
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/nats/io/jetstream/api.rb', line 108 ConsumerConfig = Struct.new(:name, :durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :mem_storage, # NATS v2.10 features :metadata, :filter_subjects, :max_bytes, keyword_init: true) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#retention ⇒ String
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/nats/io/jetstream/api.rb', line 174 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true ) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#state ⇒ StreamState
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/nats/io/jetstream/api.rb', line 174 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true ) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#storage ⇒ String
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/nats/io/jetstream/api.rb', line 174 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true ) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#stream_name ⇒ String
Returns name of the stream to which the consumer belongs.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/nats/io/jetstream/api.rb', line 68 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#stream_seq ⇒ Integer
Returns The stream sequence.
33 34 35 36 37 38 39 40 41 42 |
# File 'lib/nats/io/jetstream/api.rb', line 33 SequenceInfo = Struct.new(:consumer_seq, :stream_seq, :last_active, keyword_init: true) do def initialize(opts = {}) # Filter unrecognized fields and freeze. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#subjects ⇒ Array
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/nats/io/jetstream/api.rb', line 174 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true ) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |
#type ⇒ String
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/nats/io/jetstream/api.rb', line 174 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true ) do def initialize(opts = {}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super end end |