Module: NATS::JetStream::Manager

Defined in:
lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/jetstream/manager.rb

Overview

A JetStream::Manager can be used to make requests to the JetStream API.

Examples:

require 'nats/client'

nc = NATS.connect("demo.nats.io")

config = JetStream::API::StreamConfig.new()
nc.jsm.add_stream(config)

Instance Method Summary collapse

Instance Method Details

#account_infoObject



236
237
238
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/jetstream/manager.rb', line 236

def 
  api_request("#{@prefix}.INFO")
end

#add_consumer(stream, config, params = {}) ⇒ JetStream::API::ConsumerInfo

add_consumer creates a consumer with a given config.

Parameters:

Options Hash (params):

  • :timeout (Float)

    Time to wait for response.

Returns:

Raises:



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/jetstream/manager.rb', line 102

def add_consumer(stream, config, params={})
  raise JetStream::Error::InvalidStreamName.new("nats: invalid stream name") if stream.nil? or stream.empty?
  config = if not config.is_a?(JetStream::API::ConsumerConfig)
             JetStream::API::ConsumerConfig.new(config)
           else
             config
           end

  req_subject = case
                when config[:name]
                  # NOTE: Only supported after nats-server v2.9.0
                  if config[:filter_subject] && config[:filter_subject] != ">"
                    "#{@prefix}.CONSUMER.CREATE.#{stream}.#{config[:name]}.#{config[:filter_subject]}"
                  else
                    "#{@prefix}.CONSUMER.CREATE.#{stream}.#{config[:name]}"
                  end
                when config[:durable_name]
                  "#{@prefix}.CONSUMER.DURABLE.CREATE.#{stream}.#{config[:durable_name]}"
                else
                  "#{@prefix}.CONSUMER.CREATE.#{stream}"
                end

  config[:ack_policy] ||= JS::Config::AckExplicit
  # Check if have to normalize ack wait so that it is in nanoseconds for Go compat.
  if config[:ack_wait]
    raise ArgumentError.new("nats: invalid ack wait") unless config[:ack_wait].is_a?(Integer)
    config[:ack_wait] = config[:ack_wait] * ::NATS::NANOSECONDS
  end
  if config[:inactive_threshold]
    raise ArgumentError.new("nats: invalid inactive threshold") unless config[:inactive_threshold].is_a?(Integer)
    config[:inactive_threshold] = config[:inactive_threshold] * ::NATS::NANOSECONDS
  end

  cfg = config.to_h.compact
  req = {
    stream_name: stream,
    config: cfg
  }

  result = api_request(req_subject, req.to_json, params)
  JetStream::API::ConsumerInfo.new(result).freeze
end

#add_stream(config, params = {}) ⇒ JetStream::API::StreamCreateResponse

add_stream creates a stream with a given config.

Parameters:

Options Hash (params):

  • :timeout (Float)

    Time to wait for response.

Returns:

Raises:

  • (ArgumentError)


34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/jetstream/manager.rb', line 34

def add_stream(config, params={})
  config = if not config.is_a?(JetStream::API::StreamConfig)
             JetStream::API::StreamConfig.new(config)
           else
             config
           end
  stream = config[:name]
  raise ArgumentError.new(":name is required to create streams") unless stream
  raise ArgumentError.new("Spaces, tabs, period (.), greater than (>) or asterisk (*) are prohibited in stream names") if stream =~ /(\s|\.|\>|\*)/
  req_subject = "#{@prefix}.STREAM.CREATE.#{stream}"

  cfg = config.to_h.compact
  result = api_request(req_subject, cfg.to_json, params)
  JetStream::API::StreamCreateResponse.new(result)
end

#consumer_info(stream, consumer, params = {}) ⇒ JetStream::API::ConsumerInfo

consumer_info retrieves the current status of a consumer.

Parameters:

  • stream (String)

    Name of the stream.

  • consumer (String)

    Name of the consumer.

  • params (Hash) (defaults to: {})

    Options to customize API request.

Options Hash (params):

  • :timeout (Float)

    Time to wait for response.

Returns:

Raises:



151
152
153
154
155
156
157
158
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/jetstream/manager.rb', line 151

def consumer_info(stream, consumer, params={})
  raise JetStream::Error::InvalidStreamName.new("nats: invalid stream name") if stream.nil? or stream.empty?
  raise JetStream::Error::InvalidConsumerName.new("nats: invalid consumer name") if consumer.nil? or consumer.empty?

  req_subject = "#{@prefix}.CONSUMER.INFO.#{stream}.#{consumer}"
  result = api_request(req_subject, '', params)
  JetStream::API::ConsumerInfo.new(result)
end

#delete_consumer(stream, consumer, params = {}) ⇒ Boolean

delete_consumer deletes a consumer.

Parameters:

  • stream (String)

    Name of the stream.

  • consumer (String)

    Name of the consumer.

  • params (Hash) (defaults to: {})

    Options to customize API request.

Options Hash (params):

  • :timeout (Float)

    Time to wait for response.

Returns:

  • (Boolean)

Raises:



166
167
168
169
170
171
172
173
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/jetstream/manager.rb', line 166

def delete_consumer(stream, consumer, params={})
  raise JetStream::Error::InvalidStreamName.new("nats: invalid stream name") if stream.nil? or stream.empty?
  raise JetStream::Error::InvalidConsumerName.new("nats: invalid consumer name") if consumer.nil? or consumer.empty?

  req_subject = "#{@prefix}.CONSUMER.DELETE.#{stream}.#{consumer}"
  result = api_request(req_subject, '', params)
  result[:success]
end

#delete_stream(stream, params = {}) ⇒ Boolean

delete_stream deletes a stream.

Parameters:

  • stream (String)

    Name of the stream.

  • params (Hash) (defaults to: {})

    Options to customize API request.

Options Hash (params):

  • :timeout (Float)

    Time to wait for response.

Returns:

  • (Boolean)

Raises:



88
89
90
91
92
93
94
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/jetstream/manager.rb', line 88

def delete_stream(stream, params={})
  raise JetStream::Error::InvalidStreamName.new("nats: invalid stream name") if stream.nil? or stream.empty?

  req_subject = "#{@prefix}.STREAM.DELETE.#{stream}"
  result = api_request(req_subject, '', params)
  result[:success]
end

#find_stream_name_by_subject(subject, params = {}) ⇒ String

find_stream_name_by_subject does a lookup for the stream to which the subject belongs.

Parameters:

  • subject (String)

    The subject that belongs to a stream.

  • params (Hash) (defaults to: {})

    Options to customize API request.

Options Hash (params):

  • :timeout (Float)

    Time to wait for response.

Returns:

  • (String)

    The name of the JetStream stream for the subject.

Raises:



181
182
183
184
185
186
187
188
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/jetstream/manager.rb', line 181

def find_stream_name_by_subject(subject, params={})
  req_subject = "#{@prefix}.STREAM.NAMES"
  req = { subject: subject }
  result = api_request(req_subject, req.to_json, params)
  raise JetStream::Error::NotFound unless result[:streams]

  result[:streams].first
end

#get_last_msg(stream_name, subject, params = {}) ⇒ Object



231
232
233
234
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/jetstream/manager.rb', line 231

def get_last_msg(stream_name, subject, params={})
  params[:subject] = subject
  get_msg(stream_name, params)
end

#get_msg(stream_name, params = {}) ⇒ Object

get_msg retrieves a message from the stream.

Parameters:

  • stream_name (String)

    The stream_name.

  • params (Hash) (defaults to: {})

    Options to customize API request.

  • next (Hash)

    a customizable set of options

  • seq (Hash)

    a customizable set of options

  • subject (Hash)

    a customizable set of options

  • direct (Hash)

    a customizable set of options



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/jetstream/manager.rb', line 197

def get_msg(stream_name, params={})
  req = {}
  case
  when params[:next]
    req[:seq] = params[:seq]
    req[:next_by_subj] = params[:subject]
  when params[:seq]
    req[:seq] = params[:seq]
  when params[:subject]
    req[:last_by_subj] = params[:subject]
  end

  data = req.to_json
  if params[:direct]
    if params[:subject] and not params[:seq]
      # last_by_subject type request requires no payload.
      data = ''
      req_subject = "#{@prefix}.DIRECT.GET.#{stream_name}.#{params[:subject]}"
    else
      req_subject = "#{@prefix}.DIRECT.GET.#{stream_name}"
    end
  else
    req_subject = "#{@prefix}.STREAM.MSG.GET.#{stream_name}"
  end
  resp = api_request(req_subject, data, direct: params[:direct])
  msg = if params[:direct]
          _lift_msg_to_raw_msg(resp)
        else
          JetStream::API::RawStreamMsg.new(resp[:message])
        end

  msg
end

#stream_info(stream, params = {}) ⇒ JetStream::API::StreamInfo

stream_info retrieves the current status of a stream.

Parameters:

  • stream (String)

    Name of the stream.

  • params (Hash) (defaults to: {})

    Options to customize API request.

Options Hash (params):

  • :timeout (Float)

    Time to wait for response.

Returns:

Raises:



55
56
57
58
59
60
61
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/jetstream/manager.rb', line 55

def stream_info(stream, params={})
  raise JetStream::Error::InvalidStreamName.new("nats: invalid stream name") if stream.nil? or stream.empty?

  req_subject = "#{@prefix}.STREAM.INFO.#{stream}"
  result = api_request(req_subject, '', params)
  JetStream::API::StreamInfo.new(result)
end

#update_stream(config, params = {}) ⇒ JetStream::API::StreamCreateResponse

update_stream edits an existed stream with a given config.

Parameters:

Options Hash (params):

  • :timeout (Float)

    Time to wait for response.

Returns:

Raises:

  • (ArgumentError)


68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/jetstream/manager.rb', line 68

def update_stream(config, params={})
  config = if not config.is_a?(JetStream::API::StreamConfig)
             JetStream::API::StreamConfig.new(config)
           else
             config
           end
  stream = config[:name]
  raise ArgumentError.new(":name is required to create streams") unless stream
  raise ArgumentError.new("Spaces, tabs, period (.), greater than (>) or asterisk (*) are prohibited in stream names") if stream =~ /(\s|\.|\>|\*)/
  req_subject = "#{@prefix}.STREAM.UPDATE.#{stream}"
  cfg = config.to_h.compact
  result = api_request(req_subject, cfg.to_json, params)
  JetStream::API::StreamCreateResponse.new(result)
end