Module: NATS::JetStream::API

Defined in:
lib/nats/io/js.rb

Overview

JetStream::API are the types used to interact with the JetStream API.

Defined Under Namespace

Classes: ConsumerConfig, ConsumerInfo, RawStreamMsg, SequenceInfo, StreamConfig, StreamCreateResponse, StreamInfo, StreamState

Constant Summary collapse

Error =

When the server responds with an error from the JetStream API.

::NATS::JetStream::Error::APIError

Instance Attribute Summary collapse

Instance Attribute Details

#ack_floorSequenceInfo

Returns:



1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
# File 'lib/nats/io/js.rb', line 1100

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#ack_policyString

Returns:

  • (String)


1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
# File 'lib/nats/io/js.rb', line 1138

ConsumerConfig = Struct.new(:durable_name, :description, :deliver_subject,
                            :deliver_group, :deliver_policy, :opt_start_seq,
                            :opt_start_time, :ack_policy, :ack_wait, :max_deliver,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#ack_waitInteger

Returns:

  • (Integer)


1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
# File 'lib/nats/io/js.rb', line 1138

ConsumerConfig = Struct.new(:durable_name, :description, :deliver_subject,
                            :deliver_group, :deliver_policy, :opt_start_seq,
                            :opt_start_time, :ack_policy, :ack_wait, :max_deliver,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#bytesInteger

Returns:

  • (Integer)


1255
1256
1257
1258
1259
1260
1261
1262
1263
# File 'lib/nats/io/js.rb', line 1255

StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts,
                         :last_seq, :last_ts, :consumer_count,
                         keyword_init: true) do
  def initialize(opts={})
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#clusterHash

Returns:

  • (Hash)


1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
# File 'lib/nats/io/js.rb', line 1100

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#configConsumerConfig

Returns consumer configuration.

Returns:



1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
# File 'lib/nats/io/js.rb', line 1100

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#consumer_countInteger

Returns:

  • (Integer)


1255
1256
1257
1258
1259
1260
1261
1262
1263
# File 'lib/nats/io/js.rb', line 1255

StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts,
                         :last_seq, :last_ts, :consumer_count,
                         keyword_init: true) do
  def initialize(opts={})
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#consumer_seqInteger

Returns The consumer sequence.

Returns:

  • (Integer)

    The consumer sequence.



1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
# File 'lib/nats/io/js.rb', line 1065

SequenceInfo = Struct.new(:consumer_seq, :stream_seq, :last_active,
                          keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields and freeze.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#createdString

Returns time when the consumer was created.

Returns:

  • (String)

    time when the consumer was created.



1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
# File 'lib/nats/io/js.rb', line 1100

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#deliver_policyString

Returns:

  • (String)


1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
# File 'lib/nats/io/js.rb', line 1138

ConsumerConfig = Struct.new(:durable_name, :description, :deliver_subject,
                            :deliver_group, :deliver_policy, :opt_start_seq,
                            :opt_start_time, :ack_policy, :ack_wait, :max_deliver,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#deliveredSequenceInfo

Returns:



1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
# File 'lib/nats/io/js.rb', line 1100

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#durable_nameString

Returns:

  • (String)


1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
# File 'lib/nats/io/js.rb', line 1138

ConsumerConfig = Struct.new(:durable_name, :description, :deliver_subject,
                            :deliver_group, :deliver_policy, :opt_start_seq,
                            :opt_start_time, :ack_policy, :ack_wait, :max_deliver,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#first_seqInteger

Returns:

  • (Integer)


1255
1256
1257
1258
1259
1260
1261
1262
1263
# File 'lib/nats/io/js.rb', line 1255

StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts,
                         :last_seq, :last_ts, :consumer_count,
                         keyword_init: true) do
  def initialize(opts={})
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#last_seqInteger

Returns:

  • (Integer)


1255
1256
1257
1258
1259
1260
1261
1262
1263
# File 'lib/nats/io/js.rb', line 1255

StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts,
                         :last_seq, :last_ts, :consumer_count,
                         keyword_init: true) do
  def initialize(opts={})
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#max_ack_pendingInteger

Returns:

  • (Integer)


1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
# File 'lib/nats/io/js.rb', line 1138

ConsumerConfig = Struct.new(:durable_name, :description, :deliver_subject,
                            :deliver_group, :deliver_policy, :opt_start_seq,
                            :opt_start_time, :ack_policy, :ack_wait, :max_deliver,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#max_deliverInteger

Returns:

  • (Integer)


1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
# File 'lib/nats/io/js.rb', line 1138

ConsumerConfig = Struct.new(:durable_name, :description, :deliver_subject,
                            :deliver_group, :deliver_policy, :opt_start_seq,
                            :opt_start_time, :ack_policy, :ack_wait, :max_deliver,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#max_waitingInteger

Returns:

  • (Integer)


1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
# File 'lib/nats/io/js.rb', line 1138

ConsumerConfig = Struct.new(:durable_name, :description, :deliver_subject,
                            :deliver_group, :deliver_policy, :opt_start_seq,
                            :opt_start_time, :ack_policy, :ack_wait, :max_deliver,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#messagesInteger

Returns:

  • (Integer)


1255
1256
1257
1258
1259
1260
1261
1262
1263
# File 'lib/nats/io/js.rb', line 1255

StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts,
                         :last_seq, :last_ts, :consumer_count,
                         keyword_init: true) do
  def initialize(opts={})
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end
end

#nameString

Returns name of the consumer.

Returns:

  • (String)

    name of the consumer.



1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
# File 'lib/nats/io/js.rb', line 1100

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#num_ack_pendingInteger

Returns:

  • (Integer)


1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
# File 'lib/nats/io/js.rb', line 1100

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#num_pendingInteger

Returns:

  • (Integer)


1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
# File 'lib/nats/io/js.rb', line 1100

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#num_redeliveredInteger

Returns:

  • (Integer)


1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
# File 'lib/nats/io/js.rb', line 1100

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#num_waitingInteger

Returns:

  • (Integer)


1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
# File 'lib/nats/io/js.rb', line 1100

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#replay_policyString

Returns:

  • (String)


1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
# File 'lib/nats/io/js.rb', line 1138

ConsumerConfig = Struct.new(:durable_name, :description, :deliver_subject,
                            :deliver_group, :deliver_policy, :opt_start_seq,
                            :opt_start_time, :ack_policy, :ack_wait, :max_deliver,
                            :filter_subject, :replay_policy, :rate_limit_bps,
                            :sample_freq, :max_waiting, :max_ack_pending,
                            :flow_control, :idle_heartbeat, :headers_only,
                            keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
  end

  def to_json(*args)
    config = self.to_h
    config.delete_if { |_k, v| v.nil? }
    config.to_json(*args)
  end
end

#stream_nameString

Returns name of the stream to which the consumer belongs.

Returns:

  • (String)

    name of the stream to which the consumer belongs.



1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
# File 'lib/nats/io/js.rb', line 1100

ConsumerInfo = Struct.new(:type, :stream_name, :name, :created,
                          :config, :delivered, :ack_floor,
                          :num_ack_pending, :num_redelivered, :num_waiting,
                          :num_pending, :cluster, :push_bound,
                          keyword_init: true) do
  def initialize(opts={})
    opts[:created] = Time.parse(opts[:created])
    opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor])
    opts[:delivered] = SequenceInfo.new(opts[:delivered])
    opts[:config][:ack_wait] = opts[:config][:ack_wait] / 1_000_000_000
    opts[:config] = ConsumerConfig.new(opts[:config])
    opts.delete(:cluster)
    # Filter unrecognized fields just in case.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end

#stream_seqInteger

Returns The stream sequence.

Returns:

  • (Integer)

    The stream sequence.



1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
# File 'lib/nats/io/js.rb', line 1065

SequenceInfo = Struct.new(:consumer_seq, :stream_seq, :last_active,
                          keyword_init: true) do
  def initialize(opts={})
    # Filter unrecognized fields and freeze.
    rem = opts.keys - members
    opts.delete_if { |k| rem.include?(k) }
    super(opts)
    freeze
  end
end