Class: STAN::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/stan/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subject, opts = {}, cb) ⇒ Subscription

Returns a new instance of Subscription.



479
480
481
482
483
484
485
486
487
488
489
# File 'lib/stan/client.rb', line 479

def initialize(subject, opts={}, cb)
  @subject = subject
  @queue = opts[:queue]
  @inbox = STAN.create_inbox
  @sid = nil # inbox subscription sid
  @options = opts
  @cb = cb
  @ack_inbox = nil
  @stan = opts[:stan]
  @durable_name = opts[:durable_name]
end

Instance Attribute Details

#ack_inboxObject

Returns the value of attribute ack_inbox.



477
478
479
# File 'lib/stan/client.rb', line 477

def ack_inbox
  @ack_inbox
end

#cbObject (readonly)

Returns the value of attribute cb.



476
477
478
# File 'lib/stan/client.rb', line 476

def cb
  @cb
end

#durable_nameObject (readonly)

Returns the value of attribute durable_name.



476
477
478
# File 'lib/stan/client.rb', line 476

def durable_name
  @durable_name
end

#inboxObject (readonly)

Returns the value of attribute inbox.



476
477
478
# File 'lib/stan/client.rb', line 476

def inbox
  @inbox
end

#optionsObject (readonly)

Returns the value of attribute options.



476
477
478
# File 'lib/stan/client.rb', line 476

def options
  @options
end

#queueObject (readonly)

Returns the value of attribute queue.



476
477
478
# File 'lib/stan/client.rb', line 476

def queue
  @queue
end

#sidObject

Returns the value of attribute sid.



477
478
479
# File 'lib/stan/client.rb', line 477

def sid
  @sid
end

#stanObject (readonly)

Returns the value of attribute stan.



476
477
478
# File 'lib/stan/client.rb', line 476

def stan
  @stan
end

#subjectObject (readonly)

Returns the value of attribute subject.



476
477
478
# File 'lib/stan/client.rb', line 476

def subject
  @subject
end

Instance Method Details

#closeObject



532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
# File 'lib/stan/client.rb', line 532

def close
  synchronize do
    stan.nats.unsubscribe(self.sid)
  end

  # Make client stop tracking the subscription inbox
  # and grab close request subject under the lock.
  sub_close_subject = nil
  stan.synchronize do
    stan.sub_map.delete(self.ack_inbox)
    sub_close_subject = stan.sub_close_req_subject
  end

  sub_close_req = STAN::Protocol::UnsubscribeRequest.new({
    clientID: stan.client_id,
    subject: self.subject,
    inbox: self.ack_inbox
  })

  raw = stan.nats.request(sub_close_subject, sub_close_req.to_proto, {
    timeout: stan.options[:connect_timeout]
  })
  response = STAN::Protocol::SubscriptionResponse.decode(raw.data)
  unless response.error.empty?
    # FIXME: Error handling on unsubscribe/close
    raise Error.new(response.error)
  end
end

#to_sObject



491
492
493
# File 'lib/stan/client.rb', line 491

def to_s
  %Q(#<STAN::Subscription @subject="#{@subject}" @queue="#{@queue}" @durable_name="#{@durable_name}" @inbox="#{@inbox}" @ack_inbox="#{@ack_inbox}" @sid=#{@sid}>)
end

#unsubscribeObject

Unsubscribe removes interest in the subscription. For durables, it means that the durable interest is also removed from the server. Restarting a durable with the same name will not resume the subscription, it will be considered a new one.



499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
# File 'lib/stan/client.rb', line 499

def unsubscribe
  synchronize do
    stan.nats.unsubscribe(self.sid)
  end

  # Make client stop tracking the subscription inbox
  # and grab unsub request subject under the lock.
  unsub_subject = nil
  stan.synchronize do
    stan.sub_map.delete(self.ack_inbox)
    unsub_subject = stan.unsub_req_subject
  end

  unsub_req = STAN::Protocol::UnsubscribeRequest.new({
    clientID: stan.client_id,
    subject: self.subject,
    inbox: self.ack_inbox
  })

  if self.durable_name
    unsub_req.durableName = self.durable_name
  end

  raw = stan.nats.request(unsub_subject, unsub_req.to_proto, {
    timeout: stan.options[:connect_timeout]
  })
  response = STAN::Protocol::SubscriptionResponse.decode(raw.data)
  unless response.error.empty?
    # FIXME: Error handling on unsubscribe
    raise Error.new(response.error)
  end
end