Module: NATS::JetStream::API

Defined in:
lib/nats/io/js.rb

Overview

JetStream::API are the types used to interact with the JetStream API.

Defined Under Namespace

Classes: ConsumerConfig, ConsumerInfo, RawStreamMsg, SequenceInfo, StreamConfig, StreamCreateResponse, StreamInfo, StreamState

Constant Summary collapse

Error =

When the server responds with an error from the JetStream API.

::NATS::JetStream::Error::APIError

Instance Attribute Summary collapse

Instance Attribute Details

#ack_floorSequenceInfo

Returns:



1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
# File 'lib/nats/io/js.rb', line 1101

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#ack_policyString

Returns:

  • (String)


1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
# File 'lib/nats/io/js.rb', line 1139

ConsumerConfig = Struct.new(:durable_name, :description,
                            :deliver_policy, :opt_start_seq, :opt_start_time,
                            :ack_policy, :ack_wait, :max_deliver, :backoff,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,

                            # Pull based options
                            :max_batch, :max_expires,
                            # Push based consumers
                            :deliver_subject, :deliver_group,
                            # Ephemeral inactivity threshold
                            :inactive_threshold,
                            # Generally inherited by parent stream and other markers,
                            # now can be configured directly.
                            :num_replicas,
                            # Force memory storage
                            :memory_storage,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#ack_waitInteger

Returns:

  • (Integer)


1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
# File 'lib/nats/io/js.rb', line 1139

ConsumerConfig = Struct.new(:durable_name, :description,
                            :deliver_policy, :opt_start_seq, :opt_start_time,
                            :ack_policy, :ack_wait, :max_deliver, :backoff,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,

                            # Pull based options
                            :max_batch, :max_expires,
                            # Push based consumers
                            :deliver_subject, :deliver_group,
                            # Ephemeral inactivity threshold
                            :inactive_threshold,
                            # Generally inherited by parent stream and other markers,
                            # now can be configured directly.
                            :num_replicas,
                            # Force memory storage
                            :memory_storage,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#bytesInteger

Returns:

  • (Integer)


1269
1270
1271
1272
1273
1274
1275
1276
1277
# File 'lib/nats/io/js.rb', line 1269

StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts,
                         :last_seq, :last_ts, :consumer_count,
                         keyword_init: true) do
  def initialize(opts={})
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#clusterHash

Returns:

  • (Hash)


1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
# File 'lib/nats/io/js.rb', line 1101

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#configHash

Returns:

  • (Hash)


1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
# File 'lib/nats/io/js.rb', line 1101

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#consumer_countInteger

Returns:

  • (Integer)


1269
1270
1271
1272
1273
1274
1275
1276
1277
# File 'lib/nats/io/js.rb', line 1269

StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts,
                         :last_seq, :last_ts, :consumer_count,
                         keyword_init: true) do
  def initialize(opts={})
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#consumer_seqInteger

Returns The consumer sequence.

Returns:

  • (Integer)

    The consumer sequence.



1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
# File 'lib/nats/io/js.rb', line 1066

SequenceInfo = Struct.new(:consumer_seq, :stream_seq, :last_active,
                          keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields and freeze.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#createdString

Returns:

  • (String)


1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
# File 'lib/nats/io/js.rb', line 1101

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#deliver_policyString

Returns:

  • (String)


1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
# File 'lib/nats/io/js.rb', line 1139

ConsumerConfig = Struct.new(:durable_name, :description,
                            :deliver_policy, :opt_start_seq, :opt_start_time,
                            :ack_policy, :ack_wait, :max_deliver, :backoff,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,

                            # Pull based options
                            :max_batch, :max_expires,
                            # Push based consumers
                            :deliver_subject, :deliver_group,
                            # Ephemeral inactivity threshold
                            :inactive_threshold,
                            # Generally inherited by parent stream and other markers,
                            # now can be configured directly.
                            :num_replicas,
                            # Force memory storage
                            :memory_storage,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#deliveredSequenceInfo

Returns:



1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
# File 'lib/nats/io/js.rb', line 1101

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#did_createBoolean

Returns:

  • (Boolean)


1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
# File 'lib/nats/io/js.rb', line 1210

StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers,
                          :max_msgs, :max_bytes, :discard, :max_age,
                          :max_msgs_per_subject, :max_msg_size,
                          :storage, :num_replicas, :no_ack, :duplicate_window,
                          :placement, :allow_direct,
                          keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#discardString

Returns:

  • (String)


1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
# File 'lib/nats/io/js.rb', line 1210

StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers,
                          :max_msgs, :max_bytes, :discard, :max_age,
                          :max_msgs_per_subject, :max_msg_size,
                          :storage, :num_replicas, :no_ack, :duplicate_window,
                          :placement, :allow_direct,
                          keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#duplicate_windowInteger

Returns:

  • (Integer)


1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
# File 'lib/nats/io/js.rb', line 1210

StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers,
                          :max_msgs, :max_bytes, :discard, :max_age,
                          :max_msgs_per_subject, :max_msg_size,
                          :storage, :num_replicas, :no_ack, :duplicate_window,
                          :placement, :allow_direct,
                          keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#durable_nameString

Returns:

  • (String)


1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
# File 'lib/nats/io/js.rb', line 1139

ConsumerConfig = Struct.new(:durable_name, :description,
                            :deliver_policy, :opt_start_seq, :opt_start_time,
                            :ack_policy, :ack_wait, :max_deliver, :backoff,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,

                            # Pull based options
                            :max_batch, :max_expires,
                            # Push based consumers
                            :deliver_subject, :deliver_group,
                            # Ephemeral inactivity threshold
                            :inactive_threshold,
                            # Generally inherited by parent stream and other markers,
                            # now can be configured directly.
                            :num_replicas,
                            # Force memory storage
                            :memory_storage,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#first_seqInteger

Returns:

  • (Integer)


1269
1270
1271
1272
1273
1274
1275
1276
1277
# File 'lib/nats/io/js.rb', line 1269

StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts,
                         :last_seq, :last_ts, :consumer_count,
                         keyword_init: true) do
  def initialize(opts={})
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#last_seqInteger

Returns:

  • (Integer)


1269
1270
1271
1272
1273
1274
1275
1276
1277
# File 'lib/nats/io/js.rb', line 1269

StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts,
                         :last_seq, :last_ts, :consumer_count,
                         keyword_init: true) do
  def initialize(opts={})
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#max_ack_pendingInteger

Returns:

  • (Integer)


1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
# File 'lib/nats/io/js.rb', line 1139

ConsumerConfig = Struct.new(:durable_name, :description,
                            :deliver_policy, :opt_start_seq, :opt_start_time,
                            :ack_policy, :ack_wait, :max_deliver, :backoff,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,

                            # Pull based options
                            :max_batch, :max_expires,
                            # Push based consumers
                            :deliver_subject, :deliver_group,
                            # Ephemeral inactivity threshold
                            :inactive_threshold,
                            # Generally inherited by parent stream and other markers,
                            # now can be configured directly.
                            :num_replicas,
                            # Force memory storage
                            :memory_storage,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#max_ageInteger

Returns:

  • (Integer)


1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
# File 'lib/nats/io/js.rb', line 1210

StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers,
                          :max_msgs, :max_bytes, :discard, :max_age,
                          :max_msgs_per_subject, :max_msg_size,
                          :storage, :num_replicas, :no_ack, :duplicate_window,
                          :placement, :allow_direct,
                          keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#max_bytesInteger

Returns:

  • (Integer)


1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
# File 'lib/nats/io/js.rb', line 1210

StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers,
                          :max_msgs, :max_bytes, :discard, :max_age,
                          :max_msgs_per_subject, :max_msg_size,
                          :storage, :num_replicas, :no_ack, :duplicate_window,
                          :placement, :allow_direct,
                          keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#max_consumersInteger

Returns:

  • (Integer)


1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
# File 'lib/nats/io/js.rb', line 1210

StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers,
                          :max_msgs, :max_bytes, :discard, :max_age,
                          :max_msgs_per_subject, :max_msg_size,
                          :storage, :num_replicas, :no_ack, :duplicate_window,
                          :placement, :allow_direct,
                          keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#max_deliverInteger

Returns:

  • (Integer)


1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
# File 'lib/nats/io/js.rb', line 1139

ConsumerConfig = Struct.new(:durable_name, :description,
                            :deliver_policy, :opt_start_seq, :opt_start_time,
                            :ack_policy, :ack_wait, :max_deliver, :backoff,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,

                            # Pull based options
                            :max_batch, :max_expires,
                            # Push based consumers
                            :deliver_subject, :deliver_group,
                            # Ephemeral inactivity threshold
                            :inactive_threshold,
                            # Generally inherited by parent stream and other markers,
                            # now can be configured directly.
                            :num_replicas,
                            # Force memory storage
                            :memory_storage,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#max_msg_sizeInteger

Returns:

  • (Integer)


1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
# File 'lib/nats/io/js.rb', line 1210

StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers,
                          :max_msgs, :max_bytes, :discard, :max_age,
                          :max_msgs_per_subject, :max_msg_size,
                          :storage, :num_replicas, :no_ack, :duplicate_window,
                          :placement, :allow_direct,
                          keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#max_msgsInteger

Returns:

  • (Integer)


1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
# File 'lib/nats/io/js.rb', line 1210

StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers,
                          :max_msgs, :max_bytes, :discard, :max_age,
                          :max_msgs_per_subject, :max_msg_size,
                          :storage, :num_replicas, :no_ack, :duplicate_window,
                          :placement, :allow_direct,
                          keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#max_msgs_per_subjectInteger

Returns:

  • (Integer)


1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
# File 'lib/nats/io/js.rb', line 1210

StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers,
                          :max_msgs, :max_bytes, :discard, :max_age,
                          :max_msgs_per_subject, :max_msg_size,
                          :storage, :num_replicas, :no_ack, :duplicate_window,
                          :placement, :allow_direct,
                          keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#max_waitingInteger

Returns:

  • (Integer)


1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
# File 'lib/nats/io/js.rb', line 1139

ConsumerConfig = Struct.new(:durable_name, :description,
                            :deliver_policy, :opt_start_seq, :opt_start_time,
                            :ack_policy, :ack_wait, :max_deliver, :backoff,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,

                            # Pull based options
                            :max_batch, :max_expires,
                            # Push based consumers
                            :deliver_subject, :deliver_group,
                            # Ephemeral inactivity threshold
                            :inactive_threshold,
                            # Generally inherited by parent stream and other markers,
                            # now can be configured directly.
                            :num_replicas,
                            # Force memory storage
                            :memory_storage,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#messagesInteger

Returns:

  • (Integer)


1269
1270
1271
1272
1273
1274
1275
1276
1277
# File 'lib/nats/io/js.rb', line 1269

StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts,
                         :last_seq, :last_ts, :consumer_count,
                         keyword_init: true) do
  def initialize(opts={})
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#nameString

Returns:

  • (String)


1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
# File 'lib/nats/io/js.rb', line 1101

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#num_ack_pendingInteger

Returns:

  • (Integer)


1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
# File 'lib/nats/io/js.rb', line 1101

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#num_pendingInteger

Returns:

  • (Integer)


1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
# File 'lib/nats/io/js.rb', line 1101

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#num_redeliveredInteger

Returns:

  • (Integer)


1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
# File 'lib/nats/io/js.rb', line 1101

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#num_replicasInteger

Returns:

  • (Integer)


1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
# File 'lib/nats/io/js.rb', line 1210

StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers,
                          :max_msgs, :max_bytes, :discard, :max_age,
                          :max_msgs_per_subject, :max_msg_size,
                          :storage, :num_replicas, :no_ack, :duplicate_window,
                          :placement, :allow_direct,
                          keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#num_waitingInteger

Returns:

  • (Integer)


1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
# File 'lib/nats/io/js.rb', line 1101

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#replay_policyString

Returns:

  • (String)


1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
# File 'lib/nats/io/js.rb', line 1139

ConsumerConfig = Struct.new(:durable_name, :description,
                            :deliver_policy, :opt_start_seq, :opt_start_time,
                            :ack_policy, :ack_wait, :max_deliver, :backoff,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,

                            # Pull based options
                            :max_batch, :max_expires,
                            # Push based consumers
                            :deliver_subject, :deliver_group,
                            # Ephemeral inactivity threshold
                            :inactive_threshold,
                            # Generally inherited by parent stream and other markers,
                            # now can be configured directly.
                            :num_replicas,
                            # Force memory storage
                            :memory_storage,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#retentionString

Returns:

  • (String)


1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
# File 'lib/nats/io/js.rb', line 1210

StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers,
                          :max_msgs, :max_bytes, :discard, :max_age,
                          :max_msgs_per_subject, :max_msg_size,
                          :storage, :num_replicas, :no_ack, :duplicate_window,
                          :placement, :allow_direct,
                          keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#stateStreamState

Returns:



1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
# File 'lib/nats/io/js.rb', line 1210

StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers,
                          :max_msgs, :max_bytes, :discard, :max_age,
                          :max_msgs_per_subject, :max_msg_size,
                          :storage, :num_replicas, :no_ack, :duplicate_window,
                          :placement, :allow_direct,
                          keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#storageString

Returns:

  • (String)


1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
# File 'lib/nats/io/js.rb', line 1210

StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers,
                          :max_msgs, :max_bytes, :discard, :max_age,
                          :max_msgs_per_subject, :max_msg_size,
                          :storage, :num_replicas, :no_ack, :duplicate_window,
                          :placement, :allow_direct,
                          keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#stream_nameString

Returns name of the stream to which the consumer belongs.

Returns:

  • (String)

    name of the stream to which the consumer belongs.



1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
# File 'lib/nats/io/js.rb', line 1101

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#stream_seqInteger

Returns The stream sequence.

Returns:

  • (Integer)

    The stream sequence.



1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
# File 'lib/nats/io/js.rb', line 1066

SequenceInfo = Struct.new(:consumer_seq, :stream_seq, :last_active,
                          keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields and freeze.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#subjectsArray

Returns:

  • (Array)


1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
# File 'lib/nats/io/js.rb', line 1210

StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers,
                          :max_msgs, :max_bytes, :discard, :max_age,
                          :max_msgs_per_subject, :max_msg_size,
                          :storage, :num_replicas, :no_ack, :duplicate_window,
                          :placement, :allow_direct,
                          keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#typeString

Returns:

  • (String)


1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
# File 'lib/nats/io/js.rb', line 1210

StreamConfig = Struct.new(:name, :description, :subjects, :retention, :max_consumers,
                          :max_msgs, :max_bytes, :discard, :max_age,
                          :max_msgs_per_subject, :max_msg_size,
                          :storage, :num_replicas, :no_ack, :duplicate_window,
                          :placement, :allow_direct,
                          keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end