Class: NATS::JetStream

Inherits:
Object
  • Object
show all
Defined in:
lib/nats/io/jetstream.rb,
lib/nats/io/jetstream/js.rb,
lib/nats/io/jetstream/api.rb,
lib/nats/io/jetstream/msg.rb,
lib/nats/io/jetstream/errors.rb,
lib/nats/io/jetstream/js/sub.rb,
lib/nats/io/jetstream/manager.rb,
lib/nats/io/jetstream/msg/ack.rb,
lib/nats/io/jetstream/js/config.rb,
lib/nats/io/jetstream/js/header.rb,
lib/nats/io/jetstream/js/status.rb,
lib/nats/io/jetstream/msg/metadata.rb,
lib/nats/io/jetstream/msg/ack_methods.rb,
lib/nats/io/jetstream/pull_subscription.rb,
lib/nats/io/jetstream/push_subscription.rb

Overview

JetStream returns a context with a similar API as the NATS::Client but with enhanced functions to persist and consume messages from the NATS JetStream engine.

Examples:

nc = NATS.connect("demo.nats.io")
js = nc.jetstream()

Defined Under Namespace

Modules: API, Manager, Msg Classes: Error, PubAck

Instance Method Summary collapse

Constructor Details

#initialize(conn, params = {}) ⇒ JetStream

Create a new JetStream context for a NATS connection.

Parameters:

  • conn (NATS::Client)
  • params (Hash) (defaults to: {})

    Options to customize JetStream context.

Options Hash (params):

  • :prefix (String)

    JetStream API prefix to use for the requests.

  • :domain (String)

    JetStream Domain to use for the requests.

  • :timeout (Float)

    Default timeout to use for JS requests.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/nats/io/jetstream.rb', line 43

def initialize(conn, params={})
  @nc = conn
  @prefix = if params[:prefix]
              params[:prefix]
            elsif params[:domain]
              "$JS.#{params[:domain]}.API"
            else
              JS::DefaultAPIPrefix
            end
  @opts = params
  @opts[:timeout] ||= 5 # seconds
  params[:prefix] = @prefix

  # Include JetStream::Manager
  extend Manager
  extend KeyValue::Manager
end

Instance Method Details

#publish(subject, payload = "", **params) ⇒ PubAck

publish produces a message for JetStream.

Parameters:

  • subject (String)

    The subject from a stream where the message will be sent.

  • payload (String) (defaults to: "")

    The payload of the message.

  • params (Hash)

    Options to customize the publish message request.

Options Hash (**params):

  • :timeout (Float)

    Time to wait for an PubAck response or an error.

  • :header (Hash)

    NATS Headers to use for the message.

  • :stream (String)

    Expected Stream to which the message is being published.

Returns:

  • (PubAck)

    The pub ack response.

Raises:

  • (NATS::Timeout)

    When it takes too long to receive an ack response.



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/nats/io/jetstream.rb', line 83

def publish(subject, payload="", **params)
  params[:timeout] ||= @opts[:timeout]
  if params[:stream]
    params[:header] ||= {}
    params[:header][JS::Header::ExpectedStream] = params[:stream]
  end

  # Send message with headers.
  msg = NATS::Msg.new(subject: subject,
                      data: payload,
                      header: params[:header])

  begin
    resp = @nc.request_msg(msg, **params)
    result = JSON.parse(resp.data, symbolize_names: true)
  rescue ::NATS::IO::NoRespondersError
    raise JetStream::Error::NoStreamResponse.new("nats: no response from stream")
  end
  raise JS.from_error(result[:error]) if result[:error]

  PubAck.new(result)
end

#pull_subscribe(subject, durable, params = {}) ⇒ NATS::JetStream::PullSubscription

pull_subscribe binds or creates a subscription to a JetStream pull consumer.

Parameters:

  • subject (String, Array)

    Subject or subjects from which the messages will be fetched.

  • durable (String)

    Consumer durable name from where the messages will be fetched.

  • params (Hash) (defaults to: {})

    Options to customize the PullSubscription.

Options Hash (params):

  • :stream (String)

    Name of the Stream to which the consumer belongs.

  • :consumer (String)

    Name of the Consumer to which the PullSubscription will be bound.

  • :name (String)

    Name of the Consumer to which the PullSubscription will be bound.

  • :config (Hash)

    Configuration for the consumer.

Returns:

  • (NATS::JetStream::PullSubscription)


268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
# File 'lib/nats/io/jetstream.rb', line 268

def pull_subscribe(subject, durable, params={})
  if (!durable or durable.empty?) && !(params[:consumer] or params[:name])
    raise JetStream::Error::InvalidDurableName.new("nats: invalid durable name")
  end
  multi_filter = case 
                 when (subject.is_a?(Array) and subject.size == 1)
                   subject = subject.first
                   false
                 when (subject.is_a?(Array) and subject.size > 1)
                   true
                 end

  params[:consumer] ||= durable
  params[:consumer] ||= params[:name]
  stream = if params[:stream].nil?
             if multi_filter
               # Use the first subject to try to find the stream.
               streams = subject.map do |s|
                begin
                  find_stream_name_by_subject(s)
                rescue NATS::JetStream::Error::NotFound
                  raise NATS::JetStream::Error.new("nats: could not find stream matching filter subject '#{s}'")
                end
               end

               # Ensure that the filter subjects are not ambiguous.
               streams.uniq!
               if streams.count > 1
                 raise NATS::JetStream::Error.new("nats: multiple streams matched filter subjects: #{streams}")
               end

               streams.first
             else
               find_stream_name_by_subject(subject)
             end
           else
             params[:stream]
           end
  begin
    consumer_info(stream, params[:consumer])
  rescue NATS::JetStream::Error::NotFound => e
    # If attempting to bind, then this is a hard error.
    raise e if params[:stream] and !multi_filter

    config = if not params[:config]
               JetStream::API::ConsumerConfig.new
             elsif params[:config].is_a?(JetStream::API::ConsumerConfig)
               params[:config]
             else
               JetStream::API::ConsumerConfig.new(params[:config])
             end
    config[:durable_name] = durable
    config[:ack_policy] ||= JS::Config::AckExplicit
    if multi_filter
      config[:filter_subjects] ||= subject
    else
      config[:filter_subject] ||= subject
    end
    add_consumer(stream, config)
  end

  deliver = @nc.new_inbox
  sub = @nc.subscribe(deliver)
  sub.extend(PullSubscription)

  consumer = params[:consumer]
  subject = "#{@prefix}.CONSUMER.MSG.NEXT.#{stream}.#{consumer}"
  sub.jsi = JS::Sub.new(
    js: self,
    stream: stream,
    consumer: params[:consumer],
    nms: subject
  )
  sub
end

#subscribe(subject, params = {}, &cb) ⇒ NATS::JetStream::PushSubscription

subscribe binds or creates a push subscription to a JetStream pull consumer.

Parameters:

  • subject (String, Array)

    Subject(s) from which the messages will be fetched.

  • params (Hash) (defaults to: {})

    Options to customize the PushSubscription.

Options Hash (params):

  • :stream (String)

    Name of the Stream to which the consumer belongs.

  • :consumer (String)

    Name of the Consumer to which the PushSubscription will be bound.

  • :name (String)

    Name of the Consumer to which the PushSubscription will be bound.

  • :durable (String)

    Consumer durable name from where the messages will be fetched.

  • :config (Hash)

    Configuration for the consumer.

Returns:

  • (NATS::JetStream::PushSubscription)


116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/nats/io/jetstream.rb', line 116

def subscribe(subject, params={}, &cb)
  params[:consumer] ||= params[:durable]
  params[:consumer] ||= params[:name]
  multi_filter = case 
                 when (subject.is_a?(Array) and subject.size == 1)
                   subject = subject.first
                   false
                 when (subject.is_a?(Array) and subject.size > 1)
                   true
                 end

  # 
  stream = if params[:stream].nil?
             if multi_filter
               # Use the first subject to try to find the stream.
               streams = subject.map do |s|
                begin
                  find_stream_name_by_subject(s)
                rescue NATS::JetStream::Error::NotFound
                  raise NATS::JetStream::Error.new("nats: could not find stream matching filter subject '#{s}'")
                end
               end

               # Ensure that the filter subjects are not ambiguous.
               streams.uniq!
               if streams.count > 1
                 raise NATS::JetStream::Error.new("nats: multiple streams matched filter subjects: #{streams}")
               end

               streams.first
             else
               find_stream_name_by_subject(subject)
             end
           else
             params[:stream]
           end

  queue = params[:queue]
  durable = params[:durable]
  flow_control = params[:flow_control]
  manual_ack = params[:manual_ack]
  idle_heartbeat = params[:idle_heartbeat]
  flow_control = params[:flow_control]
  config = params[:config]

  if queue
    if durable and durable != queue
      raise NATS::JetStream::Error.new("nats: cannot create queue subscription '#{queue}' to consumer '#{durable}'")
    else
      durable = queue
    end
  end

  cinfo = nil
  consumer_found = false
  should_create = false

  if not durable
    should_create = true
  else
    begin
      cinfo = consumer_info(stream, durable)
      config = cinfo.config
      consumer_found = true
      consumer = durable
    rescue NATS::JetStream::Error::NotFound
      should_create = true
      consumer_found = false
    end
  end

  if consumer_found
    if not config.deliver_group
      if queue
        raise NATS::JetStream::Error.new("nats: cannot create a queue subscription for a consumer without a deliver group")
      elsif cinfo.push_bound
        raise NATS::JetStream::Error.new("nats: consumer is already bound to a subscription")
      end
    else
      if not queue
        raise NATS::JetStream::Error.new("nats: cannot create a subscription for a consumer with a deliver group #{config.deliver_group}")
      elsif queue != config.deliver_group
        raise NATS::JetStream::Error.new("nats: cannot create a queue subscription #{queue} for a consumer with a deliver group #{config.deliver_group}")
      end
    end
  elsif should_create
    # Auto-create consumer if none found.
    if config.nil?
      # Defaults
      config = JetStream::API::ConsumerConfig.new({ack_policy: "explicit"})
    elsif config.is_a?(Hash)
      config = JetStream::API::ConsumerConfig.new(config)
    elsif !config.is_a?(JetStream::API::ConsumerConfig)
      raise NATS::JetStream::Error.new("nats: invalid ConsumerConfig")
    end

    config.durable_name = durable if not config.durable_name
    config.deliver_group = queue if not config.deliver_group

    # Create inbox for push consumer.
    deliver = @nc.new_inbox
    config.deliver_subject = deliver

    # Auto created consumers use the filter subject.
    if multi_filter
      config[:filter_subjects] ||= subject
    else
      config[:filter_subject] ||= subject
    end

    # Heartbeats / FlowControl
    config.flow_control = flow_control
    if idle_heartbeat or config.idle_heartbeat
      idle_heartbeat = config.idle_heartbeat if config.idle_heartbeat
      idle_heartbeat = idle_heartbeat * ::NATS::NANOSECONDS
      config.idle_heartbeat = idle_heartbeat
    end

    # Auto create the consumer.
    cinfo = add_consumer(stream, config)
    consumer = cinfo.name
  end

  # Enable auto acking for async callbacks unless disabled.
  if cb and not manual_ack
    ocb = cb
    new_cb = proc do |msg|
      ocb.call(msg)
      msg.ack rescue JetStream::Error::MsgAlreadyAckd
    end
    cb = new_cb
  end
  sub = @nc.subscribe(config.deliver_subject, queue: config.deliver_group, &cb)
  sub.extend(PushSubscription)
  sub.jsi = JS::Sub.new(
    js: self,
    stream: stream,
    consumer: consumer,
  )
  sub
end