Class: NATS::JetStream

Inherits:
Object
  • Object
show all
Defined in:
lib/nats/io/js.rb

Overview

JetStream returns a context with a similar API as the NATS::Client but with enhanced functions to persist and consume messages from the NATS JetStream engine.

Examples:

nc = NATS.connect("demo.nats.io")
js = nc.jetstream()

Defined Under Namespace

Modules: API, Manager, Msg Classes: Error, PubAck

Instance Method Summary collapse

Constructor Details

#initialize(conn, params = {}) ⇒ JetStream

Create a new JetStream context for a NATS connection.

Parameters:

  • conn (NATS::Client)
  • params (Hash) (defaults to: {})

    Options to customize JetStream context.

Options Hash (params):

  • :prefix (String)

    JetStream API prefix to use for the requests.

  • :domain (String)

    JetStream Domain to use for the requests.

  • :timeout (Float)

    Default timeout to use for JS requests.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/nats/io/js.rb', line 39

def initialize(conn, params={})
  @nc = conn
  @prefix = if params[:prefix]
              params[:prefix]
            elsif params[:domain]
              "$JS.#{params[:domain]}.API"
            else
              JS::DefaultAPIPrefix
            end
  @opts = params
  @opts[:timeout] ||= 5 # seconds
  params[:prefix] = @prefix

  # Include JetStream::Manager
  extend Manager
  extend KeyValue::Manager
end

Instance Method Details

#publish(subject, payload = "", **params) ⇒ PubAck

publish produces a message for JetStream.

Parameters:

  • subject (String)

    The subject from a stream where the message will be sent.

  • payload (String) (defaults to: "")

    The payload of the message.

  • params (Hash)

    Options to customize the publish message request.

Options Hash (**params):

  • :timeout (Float)

    Time to wait for an PubAck response or an error.

  • :header (Hash)

    NATS Headers to use for the message.

  • :stream (String)

    Expected Stream to which the message is being published.

Returns:

  • (PubAck)

    The pub ack response.

Raises:

  • (NATS::Timeout)

    When it takes too long to receive an ack response.



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/nats/io/js.rb', line 79

def publish(subject, payload="", **params)
  params[:timeout] ||= @opts[:timeout]
  if params[:stream]
    params[:header] ||= {}
    params[:header][JS::Header::ExpectedStream] = params[:stream]
  end

  # Send message with headers.
  msg = NATS::Msg.new(subject: subject,
                      data: payload,
                      header: params[:header])

  begin
    resp = @nc.request_msg(msg, **params)
    result = JSON.parse(resp.data, symbolize_names: true)
  rescue ::NATS::IO::NoRespondersError
    raise JetStream::Error::NoStreamResponse.new("nats: no response from stream")
  end
  raise JS.from_error(result[:error]) if result[:error]

  PubAck.new(result)
end

#pull_subscribe(subject, durable, params = {}) ⇒ NATS::JetStream::PullSubscription

pull_subscribe binds or creates a subscription to a JetStream pull consumer.

Parameters:

  • subject (String)

    Subject from which the messages will be fetched.

  • durable (String)

    Consumer durable name from where the messages will be fetched.

  • params (Hash) (defaults to: {})

    Options to customize the PullSubscription.

Options Hash (params):

  • :stream (String)

    Name of the Stream to which the consumer belongs.

  • :consumer (String)

    Name of the Consumer to which the PullSubscription will be bound.

  • :config (Hash)

    Configuration for the consumer.

Returns:

  • (NATS::JetStream::PullSubscription)

Raises:



224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'lib/nats/io/js.rb', line 224

def pull_subscribe(subject, durable, params={})
  raise JetStream::Error::InvalidDurableName.new("nats: invalid durable name") if durable.empty?
  params[:consumer] ||= durable
  stream = params[:stream].nil? ? find_stream_name_by_subject(subject) : params[:stream]

  begin
    consumer_info(stream, params[:consumer])
  rescue NATS::JetStream::Error::NotFound => e
    # If attempting to bind, then this is a hard error.
    raise e if params[:stream]

    config = if not params[:config]
               JetStream::API::ConsumerConfig.new
             elsif params[:config].is_a?(JetStream::API::ConsumerConfig)
               params[:config]
             else
               JetStream::API::ConsumerConfig.new(params[:config])
             end
    config[:durable_name] = durable
    config[:ack_policy] ||= JS::Config::AckExplicit
    add_consumer(stream, config)
  end

  deliver = @nc.new_inbox
  sub = @nc.subscribe(deliver)
  sub.extend(PullSubscription)

  consumer = params[:consumer]
  subject = "#{@prefix}.CONSUMER.MSG.NEXT.#{stream}.#{consumer}"
  sub.jsi = JS::Sub.new(
    js: self,
    stream: stream,
    consumer: params[:consumer],
    nms: subject
  )
  sub
end

#subscribe(subject, params = {}, &cb) ⇒ NATS::JetStream::PushSubscription

subscribe binds or creates a push subscription to a JetStream pull consumer.

Parameters:

  • subject (String)

    Subject from which the messages will be fetched.

  • params (Hash) (defaults to: {})

    Options to customize the PushSubscription.

Options Hash (params):

  • :stream (String)

    Name of the Stream to which the consumer belongs.

  • :consumer (String)

    Name of the Consumer to which the PushSubscription will be bound.

  • :durable (String)

    Consumer durable name from where the messages will be fetched.

  • :config (Hash)

    Configuration for the consumer.

Returns:

  • (NATS::JetStream::PushSubscription)


111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/nats/io/js.rb', line 111

def subscribe(subject, params={}, &cb)
  params[:consumer] ||= params[:durable]
  stream = params[:stream].nil? ? find_stream_name_by_subject(subject) : params[:stream]

  queue = params[:queue]
  durable = params[:durable]
  flow_control = params[:flow_control]
  manual_ack = params[:manual_ack]
  idle_heartbeat = params[:idle_heartbeat]
  flow_control = params[:flow_control]

  if queue
    if durable and durable != queue
      raise NATS::JetStream::Error.new("nats: cannot create queue subscription '#{queue}' to consumer '#{durable}'")
    else
      durable = queue
    end
  end

  cinfo = nil
  consumer_found = false
  should_create = false

  if not durable
    should_create = true
  else
    begin
      cinfo = consumer_info(stream, durable)
      config = cinfo.config
      consumer_found = true
      consumer = durable
    rescue NATS::JetStream::Error::NotFound
      should_create = true
      consumer_found = false
    end
  end

  if consumer_found
    if not config.deliver_group
      if queue
        raise NATS::JetStream::Error.new("nats: cannot create a queue subscription for a consumer without a deliver group")
      elsif cinfo.push_bound
        raise NATS::JetStream::Error.new("nats: consumer is already bound to a subscription")
      end
    else
      if not queue
        raise NATS::JetStream::Error.new("nats: cannot create a subscription for a consumer with a deliver group #{config.deliver_group}")
      elsif queue != config.deliver_group
        raise NATS::JetStream::Error.new("nats: cannot create a queue subscription #{queue} for a consumer with a deliver group #{config.deliver_group}")
      end
    end
  elsif should_create
    # Auto-create consumer if none found.
    if config.nil?
      # Defaults
      config = JetStream::API::ConsumerConfig.new({ack_policy: "explicit"})
    elsif config.is_a?(Hash)
      config = JetStream::API::ConsumerConfig.new(config)
    elsif !config.is_a?(JetStream::API::ConsumerConfig)
      raise NATS::JetStream::Error.new("nats: invalid ConsumerConfig")
    end

    config.durable_name = durable if not config.durable_name
    config.deliver_group = queue if not config.deliver_group

    # Create inbox for push consumer.
    deliver = @nc.new_inbox
    config.deliver_subject = deliver

    # Auto created consumers use the filter subject.
    config.filter_subject = subject

    # Heartbeats / FlowControl
    config.flow_control = flow_control
    if idle_heartbeat or config.idle_heartbeat
      idle_heartbeat = config.idle_heartbeat if config.idle_heartbeat
      idle_heartbeat = idle_heartbeat * 1_000_000_000
      config.idle_heartbeat = idle_heartbeat
    end

    # Auto create the consumer.
    cinfo = add_consumer(stream, config)
    consumer = cinfo.name
  end

  # Enable auto acking for async callbacks unless disabled.
  if cb and not manual_ack
    ocb = cb
    new_cb = proc do |msg|
      ocb.call(msg)
      msg.ack rescue JetStream::Error::MsgAlreadyAckd
    end
    cb = new_cb
  end
  sub = @nc.subscribe(config.deliver_subject, queue: config.deliver_group, &cb)
  sub.extend(PushSubscription)
  sub.jsi = JS::Sub.new(
    js: self,
    stream: stream,
    consumer: consumer,
  )
  sub
end