Module: NATS::JetStream::API

Defined in:
lib/nats/io/jetstream/api.rb

Overview

JetStream::API are the types used to interact with the JetStream API.

Defined Under Namespace

Classes: ConsumerConfig, ConsumerInfo, RawStreamMsg, SequenceInfo, StreamConfig, StreamCreateResponse, StreamInfo, StreamState

Constant Summary collapse

Error =

When the server responds with an error from the JetStream API.

::NATS::JetStream::Error::APIError

Instance Attribute Summary collapse

Instance Attribute Details

#ack_floorSequenceInfo

Returns:



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/nats/io/jetstream/api.rb', line 66

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#ack_policyString

Returns:

  • (String)


104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/nats/io/jetstream/api.rb', line 104

ConsumerConfig = Struct.new(:name, :durable_name, :description,
                            :deliver_policy, :opt_start_seq, :opt_start_time,
                            :ack_policy, :ack_wait, :max_deliver, :backoff,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,

                            # Pull based options
                            :max_batch, :max_expires,
                            # Push based consumers
                            :deliver_subject, :deliver_group,
                            # Ephemeral inactivity threshold
                            :inactive_threshold,
                            # Generally inherited by parent stream and other markers,
                            # now can be configured directly.
                            :num_replicas,
                            # Force memory storage
                            :mem_storage,

                            # NATS v2.10 features
                            :metadata, :filter_subjects, :max_bytes,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#ack_waitInteger

Returns:

  • (Integer)


104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/nats/io/jetstream/api.rb', line 104

ConsumerConfig = Struct.new(:name, :durable_name, :description,
                            :deliver_policy, :opt_start_seq, :opt_start_time,
                            :ack_policy, :ack_wait, :max_deliver, :backoff,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,

                            # Pull based options
                            :max_batch, :max_expires,
                            # Push based consumers
                            :deliver_subject, :deliver_group,
                            # Ephemeral inactivity threshold
                            :inactive_threshold,
                            # Generally inherited by parent stream and other markers,
                            # now can be configured directly.
                            :num_replicas,
                            # Force memory storage
                            :mem_storage,

                            # NATS v2.10 features
                            :metadata, :filter_subjects, :max_bytes,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#bytesInteger

Returns:

  • (Integer)


247
248
249
250
251
252
253
254
255
# File 'lib/nats/io/jetstream/api.rb', line 247

StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts,
                         :last_seq, :last_ts, :consumer_count,
                         keyword_init: true) do
  def initialize(opts={})
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#clusterHash

Returns:

  • (Hash)


66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/nats/io/jetstream/api.rb', line 66

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#configHash

Returns:

  • (Hash)


66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/nats/io/jetstream/api.rb', line 66

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#consumer_countInteger

Returns:

  • (Integer)


247
248
249
250
251
252
253
254
255
# File 'lib/nats/io/jetstream/api.rb', line 247

StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts,
                         :last_seq, :last_ts, :consumer_count,
                         keyword_init: true) do
  def initialize(opts={})
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#consumer_seqInteger

Returns The consumer sequence.

Returns:

  • (Integer)

    The consumer sequence.



31
32
33
34
35
36
37
38
39
40
# File 'lib/nats/io/jetstream/api.rb', line 31

SequenceInfo = Struct.new(:consumer_seq, :stream_seq, :last_active,
                          keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields and freeze.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#createdString

Returns:

  • (String)


66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/nats/io/jetstream/api.rb', line 66

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#deliver_policyString

Returns:

  • (String)


104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/nats/io/jetstream/api.rb', line 104

ConsumerConfig = Struct.new(:name, :durable_name, :description,
                            :deliver_policy, :opt_start_seq, :opt_start_time,
                            :ack_policy, :ack_wait, :max_deliver, :backoff,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,

                            # Pull based options
                            :max_batch, :max_expires,
                            # Push based consumers
                            :deliver_subject, :deliver_group,
                            # Ephemeral inactivity threshold
                            :inactive_threshold,
                            # Generally inherited by parent stream and other markers,
                            # now can be configured directly.
                            :num_replicas,
                            # Force memory storage
                            :mem_storage,

                            # NATS v2.10 features
                            :metadata, :filter_subjects, :max_bytes,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#deliveredSequenceInfo

Returns:



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/nats/io/jetstream/api.rb', line 66

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#did_createBoolean

Returns:

  • (Boolean)


172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/nats/io/jetstream/api.rb', line 172

StreamConfig = Struct.new(
  :name,
  :description,
  :subjects,
  :retention,
  :max_consumers,
  :max_msgs,
  :max_bytes,
  :discard,
  :max_age,
  :max_msgs_per_subject,
  :max_msg_size,
  :storage,
  :num_replicas,
  :no_ack,
  :duplicate_window,
  :placement,
  :mirror,
  :sources,
  :sealed,
  :deny_delete,
  :deny_purge,
  :allow_rollup_hdrs,
  :republish,
  :allow_direct,
  :mirror_direct,
  :metadata,
  keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#discardString

Returns:

  • (String)


172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/nats/io/jetstream/api.rb', line 172

StreamConfig = Struct.new(
  :name,
  :description,
  :subjects,
  :retention,
  :max_consumers,
  :max_msgs,
  :max_bytes,
  :discard,
  :max_age,
  :max_msgs_per_subject,
  :max_msg_size,
  :storage,
  :num_replicas,
  :no_ack,
  :duplicate_window,
  :placement,
  :mirror,
  :sources,
  :sealed,
  :deny_delete,
  :deny_purge,
  :allow_rollup_hdrs,
  :republish,
  :allow_direct,
  :mirror_direct,
  :metadata,
  keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#duplicate_windowInteger

Returns:

  • (Integer)


172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/nats/io/jetstream/api.rb', line 172

StreamConfig = Struct.new(
  :name,
  :description,
  :subjects,
  :retention,
  :max_consumers,
  :max_msgs,
  :max_bytes,
  :discard,
  :max_age,
  :max_msgs_per_subject,
  :max_msg_size,
  :storage,
  :num_replicas,
  :no_ack,
  :duplicate_window,
  :placement,
  :mirror,
  :sources,
  :sealed,
  :deny_delete,
  :deny_purge,
  :allow_rollup_hdrs,
  :republish,
  :allow_direct,
  :mirror_direct,
  :metadata,
  keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#durable_nameString

Returns:

  • (String)


104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/nats/io/jetstream/api.rb', line 104

ConsumerConfig = Struct.new(:name, :durable_name, :description,
                            :deliver_policy, :opt_start_seq, :opt_start_time,
                            :ack_policy, :ack_wait, :max_deliver, :backoff,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,

                            # Pull based options
                            :max_batch, :max_expires,
                            # Push based consumers
                            :deliver_subject, :deliver_group,
                            # Ephemeral inactivity threshold
                            :inactive_threshold,
                            # Generally inherited by parent stream and other markers,
                            # now can be configured directly.
                            :num_replicas,
                            # Force memory storage
                            :mem_storage,

                            # NATS v2.10 features
                            :metadata, :filter_subjects, :max_bytes,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#first_seqInteger

Returns:

  • (Integer)


247
248
249
250
251
252
253
254
255
# File 'lib/nats/io/jetstream/api.rb', line 247

StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts,
                         :last_seq, :last_ts, :consumer_count,
                         keyword_init: true) do
  def initialize(opts={})
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#last_seqInteger

Returns:

  • (Integer)


247
248
249
250
251
252
253
254
255
# File 'lib/nats/io/jetstream/api.rb', line 247

StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts,
                         :last_seq, :last_ts, :consumer_count,
                         keyword_init: true) do
  def initialize(opts={})
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#max_ack_pendingInteger

Returns:

  • (Integer)


104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/nats/io/jetstream/api.rb', line 104

ConsumerConfig = Struct.new(:name, :durable_name, :description,
                            :deliver_policy, :opt_start_seq, :opt_start_time,
                            :ack_policy, :ack_wait, :max_deliver, :backoff,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,

                            # Pull based options
                            :max_batch, :max_expires,
                            # Push based consumers
                            :deliver_subject, :deliver_group,
                            # Ephemeral inactivity threshold
                            :inactive_threshold,
                            # Generally inherited by parent stream and other markers,
                            # now can be configured directly.
                            :num_replicas,
                            # Force memory storage
                            :mem_storage,

                            # NATS v2.10 features
                            :metadata, :filter_subjects, :max_bytes,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#max_ageInteger

Returns:

  • (Integer)


172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/nats/io/jetstream/api.rb', line 172

StreamConfig = Struct.new(
  :name,
  :description,
  :subjects,
  :retention,
  :max_consumers,
  :max_msgs,
  :max_bytes,
  :discard,
  :max_age,
  :max_msgs_per_subject,
  :max_msg_size,
  :storage,
  :num_replicas,
  :no_ack,
  :duplicate_window,
  :placement,
  :mirror,
  :sources,
  :sealed,
  :deny_delete,
  :deny_purge,
  :allow_rollup_hdrs,
  :republish,
  :allow_direct,
  :mirror_direct,
  :metadata,
  keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#max_bytesInteger

Returns:

  • (Integer)


172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/nats/io/jetstream/api.rb', line 172

StreamConfig = Struct.new(
  :name,
  :description,
  :subjects,
  :retention,
  :max_consumers,
  :max_msgs,
  :max_bytes,
  :discard,
  :max_age,
  :max_msgs_per_subject,
  :max_msg_size,
  :storage,
  :num_replicas,
  :no_ack,
  :duplicate_window,
  :placement,
  :mirror,
  :sources,
  :sealed,
  :deny_delete,
  :deny_purge,
  :allow_rollup_hdrs,
  :republish,
  :allow_direct,
  :mirror_direct,
  :metadata,
  keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#max_consumersInteger

Returns:

  • (Integer)


172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/nats/io/jetstream/api.rb', line 172

StreamConfig = Struct.new(
  :name,
  :description,
  :subjects,
  :retention,
  :max_consumers,
  :max_msgs,
  :max_bytes,
  :discard,
  :max_age,
  :max_msgs_per_subject,
  :max_msg_size,
  :storage,
  :num_replicas,
  :no_ack,
  :duplicate_window,
  :placement,
  :mirror,
  :sources,
  :sealed,
  :deny_delete,
  :deny_purge,
  :allow_rollup_hdrs,
  :republish,
  :allow_direct,
  :mirror_direct,
  :metadata,
  keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#max_deliverInteger

Returns:

  • (Integer)


104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/nats/io/jetstream/api.rb', line 104

ConsumerConfig = Struct.new(:name, :durable_name, :description,
                            :deliver_policy, :opt_start_seq, :opt_start_time,
                            :ack_policy, :ack_wait, :max_deliver, :backoff,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,

                            # Pull based options
                            :max_batch, :max_expires,
                            # Push based consumers
                            :deliver_subject, :deliver_group,
                            # Ephemeral inactivity threshold
                            :inactive_threshold,
                            # Generally inherited by parent stream and other markers,
                            # now can be configured directly.
                            :num_replicas,
                            # Force memory storage
                            :mem_storage,

                            # NATS v2.10 features
                            :metadata, :filter_subjects, :max_bytes,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#max_msg_sizeInteger

Returns:

  • (Integer)


172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/nats/io/jetstream/api.rb', line 172

StreamConfig = Struct.new(
  :name,
  :description,
  :subjects,
  :retention,
  :max_consumers,
  :max_msgs,
  :max_bytes,
  :discard,
  :max_age,
  :max_msgs_per_subject,
  :max_msg_size,
  :storage,
  :num_replicas,
  :no_ack,
  :duplicate_window,
  :placement,
  :mirror,
  :sources,
  :sealed,
  :deny_delete,
  :deny_purge,
  :allow_rollup_hdrs,
  :republish,
  :allow_direct,
  :mirror_direct,
  :metadata,
  keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#max_msgsInteger

Returns:

  • (Integer)


172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/nats/io/jetstream/api.rb', line 172

StreamConfig = Struct.new(
  :name,
  :description,
  :subjects,
  :retention,
  :max_consumers,
  :max_msgs,
  :max_bytes,
  :discard,
  :max_age,
  :max_msgs_per_subject,
  :max_msg_size,
  :storage,
  :num_replicas,
  :no_ack,
  :duplicate_window,
  :placement,
  :mirror,
  :sources,
  :sealed,
  :deny_delete,
  :deny_purge,
  :allow_rollup_hdrs,
  :republish,
  :allow_direct,
  :mirror_direct,
  :metadata,
  keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#max_msgs_per_subjectInteger

Returns:

  • (Integer)


172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/nats/io/jetstream/api.rb', line 172

StreamConfig = Struct.new(
  :name,
  :description,
  :subjects,
  :retention,
  :max_consumers,
  :max_msgs,
  :max_bytes,
  :discard,
  :max_age,
  :max_msgs_per_subject,
  :max_msg_size,
  :storage,
  :num_replicas,
  :no_ack,
  :duplicate_window,
  :placement,
  :mirror,
  :sources,
  :sealed,
  :deny_delete,
  :deny_purge,
  :allow_rollup_hdrs,
  :republish,
  :allow_direct,
  :mirror_direct,
  :metadata,
  keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#max_waitingInteger

Returns:

  • (Integer)


104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/nats/io/jetstream/api.rb', line 104

ConsumerConfig = Struct.new(:name, :durable_name, :description,
                            :deliver_policy, :opt_start_seq, :opt_start_time,
                            :ack_policy, :ack_wait, :max_deliver, :backoff,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,

                            # Pull based options
                            :max_batch, :max_expires,
                            # Push based consumers
                            :deliver_subject, :deliver_group,
                            # Ephemeral inactivity threshold
                            :inactive_threshold,
                            # Generally inherited by parent stream and other markers,
                            # now can be configured directly.
                            :num_replicas,
                            # Force memory storage
                            :mem_storage,

                            # NATS v2.10 features
                            :metadata, :filter_subjects, :max_bytes,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#messagesInteger

Returns:

  • (Integer)


247
248
249
250
251
252
253
254
255
# File 'lib/nats/io/jetstream/api.rb', line 247

StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts,
                         :last_seq, :last_ts, :consumer_count,
                         keyword_init: true) do
  def initialize(opts={})
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#nameString

Returns:

  • (String)


66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/nats/io/jetstream/api.rb', line 66

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#num_ack_pendingInteger

Returns:

  • (Integer)


66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/nats/io/jetstream/api.rb', line 66

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#num_pendingInteger

Returns:

  • (Integer)


66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/nats/io/jetstream/api.rb', line 66

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#num_redeliveredInteger

Returns:

  • (Integer)


66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/nats/io/jetstream/api.rb', line 66

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#num_replicasInteger

Returns:

  • (Integer)


172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/nats/io/jetstream/api.rb', line 172

StreamConfig = Struct.new(
  :name,
  :description,
  :subjects,
  :retention,
  :max_consumers,
  :max_msgs,
  :max_bytes,
  :discard,
  :max_age,
  :max_msgs_per_subject,
  :max_msg_size,
  :storage,
  :num_replicas,
  :no_ack,
  :duplicate_window,
  :placement,
  :mirror,
  :sources,
  :sealed,
  :deny_delete,
  :deny_purge,
  :allow_rollup_hdrs,
  :republish,
  :allow_direct,
  :mirror_direct,
  :metadata,
  keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#num_waitingInteger

Returns:

  • (Integer)


66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/nats/io/jetstream/api.rb', line 66

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#replay_policyString

Returns:

  • (String)


104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/nats/io/jetstream/api.rb', line 104

ConsumerConfig = Struct.new(:name, :durable_name, :description,
                            :deliver_policy, :opt_start_seq, :opt_start_time,
                            :ack_policy, :ack_wait, :max_deliver, :backoff,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,

                            # Pull based options
                            :max_batch, :max_expires,
                            # Push based consumers
                            :deliver_subject, :deliver_group,
                            # Ephemeral inactivity threshold
                            :inactive_threshold,
                            # Generally inherited by parent stream and other markers,
                            # now can be configured directly.
                            :num_replicas,
                            # Force memory storage
                            :mem_storage,

                            # NATS v2.10 features
                            :metadata, :filter_subjects, :max_bytes,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#retentionString

Returns:

  • (String)


172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/nats/io/jetstream/api.rb', line 172

StreamConfig = Struct.new(
  :name,
  :description,
  :subjects,
  :retention,
  :max_consumers,
  :max_msgs,
  :max_bytes,
  :discard,
  :max_age,
  :max_msgs_per_subject,
  :max_msg_size,
  :storage,
  :num_replicas,
  :no_ack,
  :duplicate_window,
  :placement,
  :mirror,
  :sources,
  :sealed,
  :deny_delete,
  :deny_purge,
  :allow_rollup_hdrs,
  :republish,
  :allow_direct,
  :mirror_direct,
  :metadata,
  keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#stateStreamState

Returns:



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/nats/io/jetstream/api.rb', line 172

StreamConfig = Struct.new(
  :name,
  :description,
  :subjects,
  :retention,
  :max_consumers,
  :max_msgs,
  :max_bytes,
  :discard,
  :max_age,
  :max_msgs_per_subject,
  :max_msg_size,
  :storage,
  :num_replicas,
  :no_ack,
  :duplicate_window,
  :placement,
  :mirror,
  :sources,
  :sealed,
  :deny_delete,
  :deny_purge,
  :allow_rollup_hdrs,
  :republish,
  :allow_direct,
  :mirror_direct,
  :metadata,
  keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#storageString

Returns:

  • (String)


172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/nats/io/jetstream/api.rb', line 172

StreamConfig = Struct.new(
  :name,
  :description,
  :subjects,
  :retention,
  :max_consumers,
  :max_msgs,
  :max_bytes,
  :discard,
  :max_age,
  :max_msgs_per_subject,
  :max_msg_size,
  :storage,
  :num_replicas,
  :no_ack,
  :duplicate_window,
  :placement,
  :mirror,
  :sources,
  :sealed,
  :deny_delete,
  :deny_purge,
  :allow_rollup_hdrs,
  :republish,
  :allow_direct,
  :mirror_direct,
  :metadata,
  keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#stream_nameString

Returns name of the stream to which the consumer belongs.

Returns:

  • (String)

    name of the stream to which the consumer belongs.



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/nats/io/jetstream/api.rb', line 66

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#stream_seqInteger

Returns The stream sequence.

Returns:

  • (Integer)

    The stream sequence.



31
32
33
34
35
36
37
38
39
40
# File 'lib/nats/io/jetstream/api.rb', line 31

SequenceInfo = Struct.new(:consumer_seq, :stream_seq, :last_active,
                          keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields and freeze.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#subjectsArray

Returns:

  • (Array)


172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/nats/io/jetstream/api.rb', line 172

StreamConfig = Struct.new(
  :name,
  :description,
  :subjects,
  :retention,
  :max_consumers,
  :max_msgs,
  :max_bytes,
  :discard,
  :max_age,
  :max_msgs_per_subject,
  :max_msg_size,
  :storage,
  :num_replicas,
  :no_ack,
  :duplicate_window,
  :placement,
  :mirror,
  :sources,
  :sealed,
  :deny_delete,
  :deny_purge,
  :allow_rollup_hdrs,
  :republish,
  :allow_direct,
  :mirror_direct,
  :metadata,
  keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#typeString

Returns:

  • (String)


172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/nats/io/jetstream/api.rb', line 172

StreamConfig = Struct.new(
  :name,
  :description,
  :subjects,
  :retention,
  :max_consumers,
  :max_msgs,
  :max_bytes,
  :discard,
  :max_age,
  :max_msgs_per_subject,
  :max_msg_size,
  :storage,
  :num_replicas,
  :no_ack,
  :duplicate_window,
  :placement,
  :mirror,
  :sources,
  :sealed,
  :deny_delete,
  :deny_purge,
  :allow_rollup_hdrs,
  :republish,
  :allow_direct,
  :mirror_direct,
  :metadata,
  keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end