Class: NATS::JetStream

Inherits:
Object
  • Object
show all
Defined in:
lib/nats/io/jetstream.rb,
lib/nats/io/jetstream/js.rb,
lib/nats/io/jetstream/api.rb,
lib/nats/io/jetstream/msg.rb,
lib/nats/io/jetstream/errors.rb,
lib/nats/io/jetstream/js/sub.rb,
lib/nats/io/jetstream/manager.rb,
lib/nats/io/jetstream/msg/ack.rb,
lib/nats/io/jetstream/js/config.rb,
lib/nats/io/jetstream/js/header.rb,
lib/nats/io/jetstream/js/status.rb,
lib/nats/io/jetstream/msg/metadata.rb,
lib/nats/io/jetstream/msg/ack_methods.rb,
lib/nats/io/jetstream/pull_subscription.rb,
lib/nats/io/jetstream/push_subscription.rb

Overview

JetStream returns a context with a similar API as the NATS::Client but with enhanced functions to persist and consume messages from the NATS JetStream engine.

Examples:

nc = NATS.connect("demo.nats.io")
js = nc.jetstream()

Defined Under Namespace

Modules: API, Manager, Msg Classes: Error, PubAck

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(conn, params = {}) ⇒ JetStream

Create a new JetStream context for a NATS connection.

Parameters:

  • conn (NATS::Client)
  • params (Hash) (defaults to: {})

    Options to customize JetStream context.

Options Hash (params):

  • :prefix (String)

    JetStream API prefix to use for the requests.

  • :domain (String)

    JetStream Domain to use for the requests.

  • :timeout (Float)

    Default timeout to use for JS requests.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/nats/io/jetstream.rb', line 44

def initialize(conn, params = {})
  @nc = conn
  @prefix = if params[:prefix]
    params[:prefix]
  elsif params[:domain]
    "$JS.#{params[:domain]}.API"
  else
    JS::DefaultAPIPrefix
  end
  @opts = params
  @opts[:timeout] ||= 5 # seconds
  params[:prefix] = @prefix

  # Include JetStream::Manager
  extend Manager
  extend KeyValue::Manager
end

Instance Attribute Details

#ncObject (readonly)

Returns the value of attribute nc.



35
36
37
# File 'lib/nats/io/jetstream.rb', line 35

def nc
  @nc
end

#optsObject (readonly)

Returns the value of attribute opts.



35
36
37
# File 'lib/nats/io/jetstream.rb', line 35

def opts
  @opts
end

#prefixObject (readonly)

Returns the value of attribute prefix.



35
36
37
# File 'lib/nats/io/jetstream.rb', line 35

def prefix
  @prefix
end

Instance Method Details

#publish(subject, payload = "", **params) ⇒ PubAck

publish produces a message for JetStream.

Parameters:

  • subject (String)

    The subject from a stream where the message will be sent.

  • payload (String) (defaults to: "")

    The payload of the message.

  • params (Hash)

    Options to customize the publish message request.

Options Hash (**params):

  • :timeout (Float)

    Time to wait for an PubAck response or an error.

  • :header (Hash)

    NATS Headers to use for the message.

  • :stream (String)

    Expected Stream to which the message is being published.

Returns:

  • (PubAck)

    The pub ack response.

Raises:

  • (NATS::Timeout)

    When it takes too long to receive an ack response.



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/nats/io/jetstream.rb', line 84

def publish(subject, payload = "", **params)
  params[:timeout] ||= @opts[:timeout]
  if params[:stream]
    params[:header] ||= {}
    params[:header][JS::Header::ExpectedStream] = params[:stream]
  end

  # Send message with headers.
  msg = NATS::Msg.new(subject: subject,
    data: payload,
    header: params[:header])

  begin
    resp = @nc.request_msg(msg, **params)
    result = JSON.parse(resp.data, symbolize_names: true)
  rescue ::NATS::IO::NoRespondersError
    raise JetStream::Error::NoStreamResponse.new("nats: no response from stream")
  end
  raise JS.from_error(result[:error]) if result[:error]

  PubAck.new(result)
end

#pull_subscribe(subject, durable, params = {}) ⇒ NATS::JetStream::PullSubscription

pull_subscribe binds or creates a subscription to a JetStream pull consumer.

Parameters:

  • subject (String, Array)

    Subject or subjects from which the messages will be fetched.

  • durable (String)

    Consumer durable name from where the messages will be fetched.

  • params (Hash) (defaults to: {})

    Options to customize the PullSubscription.

Options Hash (params):

  • :stream (String)

    Name of the Stream to which the consumer belongs.

  • :consumer (String)

    Name of the Consumer to which the PullSubscription will be bound.

  • :name (String)

    Name of the Consumer to which the PullSubscription will be bound.

  • :config (Hash)

    Configuration for the consumer.

Returns:

  • (NATS::JetStream::PullSubscription)


267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/nats/io/jetstream.rb', line 267

def pull_subscribe(subject, durable, params = {})
  if (!durable || durable.empty?) && !(params[:consumer] || params[:name])
    raise JetStream::Error::InvalidDurableName.new("nats: invalid durable name")
  end
  multi_filter = if subject.is_a?(Array) && (subject.size == 1)
    subject = subject.first
    false
  elsif subject.is_a?(Array) && (subject.size > 1)
    true
  end

  params[:consumer] ||= durable
  params[:consumer] ||= params[:name]
  stream = if params[:stream].nil?
    if multi_filter
      # Use the first subject to try to find the stream.
      streams = subject.map do |s|
        find_stream_name_by_subject(s)
      rescue NATS::JetStream::Error::NotFound
        raise NATS::JetStream::Error.new("nats: could not find stream matching filter subject '#{s}'")
      end

      # Ensure that the filter subjects are not ambiguous.
      streams.uniq!
      if streams.count > 1
        raise NATS::JetStream::Error.new("nats: multiple streams matched filter subjects: #{streams}")
      end

      streams.first
    else
      find_stream_name_by_subject(subject)
    end
  else
    params[:stream]
  end
  begin
    consumer_info(stream, params[:consumer])
  rescue NATS::JetStream::Error::NotFound => e
    # If attempting to bind, then this is a hard error.
    raise e if params[:stream] && !multi_filter

    config = if !(params[:config])
      JetStream::API::ConsumerConfig.new
    elsif params[:config].is_a?(JetStream::API::ConsumerConfig)
      params[:config]
    else
      JetStream::API::ConsumerConfig.new(params[:config])
    end
    config[:durable_name] = durable
    config[:ack_policy] ||= JS::Config::AckExplicit
    if multi_filter
      config[:filter_subjects] ||= subject
    else
      config[:filter_subject] ||= subject
    end
    add_consumer(stream, config)
  end

  deliver = @nc.new_inbox
  sub = @nc.subscribe(deliver)
  sub.extend(PullSubscription)

  consumer = params[:consumer]
  subject = "#{@prefix}.CONSUMER.MSG.NEXT.#{stream}.#{consumer}"
  sub.jsi = JS::Sub.new(
    js: self,
    stream: stream,
    consumer: params[:consumer],
    nms: subject
  )
  sub
end

#subscribe(subject, params = {}, &cb) ⇒ NATS::JetStream::PushSubscription

subscribe binds or creates a push subscription to a JetStream pull consumer.

Parameters:

  • subject (String, Array)

    Subject(s) from which the messages will be fetched.

  • params (Hash) (defaults to: {})

    Options to customize the PushSubscription.

Options Hash (params):

  • :stream (String)

    Name of the Stream to which the consumer belongs.

  • :consumer (String)

    Name of the Consumer to which the PushSubscription will be bound.

  • :name (String)

    Name of the Consumer to which the PushSubscription will be bound.

  • :durable (String)

    Consumer durable name from where the messages will be fetched.

  • :config (Hash)

    Configuration for the consumer.

Returns:

  • (NATS::JetStream::PushSubscription)


117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/nats/io/jetstream.rb', line 117

def subscribe(subject, params = {}, &cb)
  params[:consumer] ||= params[:durable]
  params[:consumer] ||= params[:name]
  multi_filter = if subject.is_a?(Array) && (subject.size == 1)
    subject = subject.first
    false
  elsif subject.is_a?(Array) && (subject.size > 1)
    true
  end

  stream = if params[:stream].nil?
    if multi_filter
      # Use the first subject to try to find the stream.
      streams = subject.map do |s|
        find_stream_name_by_subject(s)
      rescue NATS::JetStream::Error::NotFound
        raise NATS::JetStream::Error.new("nats: could not find stream matching filter subject '#{s}'")
      end

      # Ensure that the filter subjects are not ambiguous.
      streams.uniq!
      if streams.count > 1
        raise NATS::JetStream::Error.new("nats: multiple streams matched filter subjects: #{streams}")
      end

      streams.first
    else
      find_stream_name_by_subject(subject)
    end
  else
    params[:stream]
  end

  queue = params[:queue]
  durable = params[:durable]
  params[:flow_control]
  manual_ack = params[:manual_ack]
  idle_heartbeat = params[:idle_heartbeat]
  flow_control = params[:flow_control]
  config = params[:config]

  if queue
    if durable && (durable != queue)
      raise NATS::JetStream::Error.new("nats: cannot create queue subscription '#{queue}' to consumer '#{durable}'")
    else
      durable = queue
    end
  end

  cinfo = nil
  consumer_found = false
  should_create = false

  if !durable
    should_create = true
  else
    begin
      cinfo = consumer_info(stream, durable)
      config = cinfo.config
      consumer_found = true
      consumer = durable
    rescue NATS::JetStream::Error::NotFound
      should_create = true
      consumer_found = false
    end
  end

  if consumer_found
    if !config.deliver_group
      if queue
        raise NATS::JetStream::Error.new("nats: cannot create a queue subscription for a consumer without a deliver group")
      elsif cinfo.push_bound
        raise NATS::JetStream::Error.new("nats: consumer is already bound to a subscription")
      end
    elsif !queue
      raise NATS::JetStream::Error.new("nats: cannot create a subscription for a consumer with a deliver group #{config.deliver_group}")
    elsif queue != config.deliver_group
      raise NATS::JetStream::Error.new("nats: cannot create a queue subscription #{queue} for a consumer with a deliver group #{config.deliver_group}")
    end
  elsif should_create
    # Auto-create consumer if none found.
    if config.nil?
      # Defaults
      config = JetStream::API::ConsumerConfig.new({ack_policy: "explicit"})
    elsif config.is_a?(Hash)
      config = JetStream::API::ConsumerConfig.new(config)
    elsif !config.is_a?(JetStream::API::ConsumerConfig)
      raise NATS::JetStream::Error.new("nats: invalid ConsumerConfig")
    end

    config.durable_name = durable if !config.durable_name
    config.deliver_group = queue if !config.deliver_group

    # Create inbox for push consumer.
    deliver = @nc.new_inbox
    config.deliver_subject = deliver

    # Auto created consumers use the filter subject.
    if multi_filter
      config[:filter_subjects] ||= subject
    else
      config[:filter_subject] ||= subject
    end

    # Heartbeats / FlowControl
    config.flow_control = flow_control
    if idle_heartbeat || config.idle_heartbeat
      idle_heartbeat = config.idle_heartbeat if config.idle_heartbeat
      config.idle_heartbeat = idle_heartbeat
    end

    # Auto create the consumer.
    cinfo = add_consumer(stream, config)
    consumer = cinfo.name
  end

  # Enable auto acking for async callbacks unless disabled.
  # In case ack policy is none then we also do not require to ack.
  if cb && !manual_ack && (config.ack_policy != "none")
    ocb = cb
    new_cb = proc do |msg|
      ocb.call(msg)
      begin
        msg.ack
      rescue
        JetStream::Error::MsgAlreadyAckd
      end
    end
    cb = new_cb
  end
  sub = @nc.subscribe(config.deliver_subject, queue: config.deliver_group, &cb)
  sub.extend(PushSubscription)
  sub.jsi = JS::Sub.new(
    js: self,
    stream: stream,
    consumer: consumer
  )
  sub
end