Class: STAN::Subscription
- Inherits:
-
Object
- Object
- STAN::Subscription
- Defined in:
- lib/stan/client.rb
Instance Attribute Summary collapse
-
#ack_inbox ⇒ Object
Returns the value of attribute ack_inbox.
-
#cb ⇒ Object
readonly
Returns the value of attribute cb.
-
#durable_name ⇒ Object
readonly
Returns the value of attribute durable_name.
-
#inbox ⇒ Object
readonly
Returns the value of attribute inbox.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#sid ⇒ Object
Returns the value of attribute sid.
-
#stan ⇒ Object
readonly
Returns the value of attribute stan.
-
#subject ⇒ Object
readonly
Returns the value of attribute subject.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(subject, opts = {}, cb) ⇒ Subscription
constructor
A new instance of Subscription.
- #to_s ⇒ Object
-
#unsubscribe ⇒ Object
Unsubscribe removes interest in the subscription.
Constructor Details
#initialize(subject, opts = {}, cb) ⇒ Subscription
Returns a new instance of Subscription.
479 480 481 482 483 484 485 486 487 488 489 |
# File 'lib/stan/client.rb', line 479 def initialize(subject, opts={}, cb) @subject = subject @queue = opts[:queue] @inbox = STAN.create_inbox @sid = nil # inbox subscription sid @options = opts @cb = cb @ack_inbox = nil @stan = opts[:stan] @durable_name = opts[:durable_name] end |
Instance Attribute Details
#ack_inbox ⇒ Object
Returns the value of attribute ack_inbox.
477 478 479 |
# File 'lib/stan/client.rb', line 477 def ack_inbox @ack_inbox end |
#cb ⇒ Object (readonly)
Returns the value of attribute cb.
476 477 478 |
# File 'lib/stan/client.rb', line 476 def cb @cb end |
#durable_name ⇒ Object (readonly)
Returns the value of attribute durable_name.
476 477 478 |
# File 'lib/stan/client.rb', line 476 def durable_name @durable_name end |
#inbox ⇒ Object (readonly)
Returns the value of attribute inbox.
476 477 478 |
# File 'lib/stan/client.rb', line 476 def inbox @inbox end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
476 477 478 |
# File 'lib/stan/client.rb', line 476 def @options end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
476 477 478 |
# File 'lib/stan/client.rb', line 476 def queue @queue end |
#sid ⇒ Object
Returns the value of attribute sid.
477 478 479 |
# File 'lib/stan/client.rb', line 477 def sid @sid end |
#stan ⇒ Object (readonly)
Returns the value of attribute stan.
476 477 478 |
# File 'lib/stan/client.rb', line 476 def stan @stan end |
#subject ⇒ Object (readonly)
Returns the value of attribute subject.
476 477 478 |
# File 'lib/stan/client.rb', line 476 def subject @subject end |
Instance Method Details
#close ⇒ Object
532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 |
# File 'lib/stan/client.rb', line 532 def close synchronize do stan.nats.unsubscribe(self.sid) end # Make client stop tracking the subscription inbox # and grab close request subject under the lock. sub_close_subject = nil stan.synchronize do stan.sub_map.delete(self.ack_inbox) sub_close_subject = stan.sub_close_req_subject end sub_close_req = STAN::Protocol::UnsubscribeRequest.new({ clientID: stan.client_id, subject: self.subject, inbox: self.ack_inbox }) raw = stan.nats.request(sub_close_subject, sub_close_req.to_proto, { timeout: stan.[:connect_timeout] }) response = STAN::Protocol::SubscriptionResponse.decode(raw.data) unless response.error.empty? # FIXME: Error handling on unsubscribe/close raise Error.new(response.error) end end |
#to_s ⇒ Object
491 492 493 |
# File 'lib/stan/client.rb', line 491 def to_s %Q(#<STAN::Subscription @subject="#{@subject}" @queue="#{@queue}" @durable_name="#{@durable_name}" @inbox="#{@inbox}" @ack_inbox="#{@ack_inbox}" @sid=#{@sid}>) end |
#unsubscribe ⇒ Object
Unsubscribe removes interest in the subscription. For durables, it means that the durable interest is also removed from the server. Restarting a durable with the same name will not resume the subscription, it will be considered a new one.
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 |
# File 'lib/stan/client.rb', line 499 def unsubscribe synchronize do stan.nats.unsubscribe(self.sid) end # Make client stop tracking the subscription inbox # and grab unsub request subject under the lock. unsub_subject = nil stan.synchronize do stan.sub_map.delete(self.ack_inbox) unsub_subject = stan.unsub_req_subject end unsub_req = STAN::Protocol::UnsubscribeRequest.new({ clientID: stan.client_id, subject: self.subject, inbox: self.ack_inbox }) if self.durable_name unsub_req.durableName = self.durable_name end raw = stan.nats.request(unsub_subject, unsub_req.to_proto, { timeout: stan.[:connect_timeout] }) response = STAN::Protocol::SubscriptionResponse.decode(raw.data) unless response.error.empty? # FIXME: Error handling on unsubscribe raise Error.new(response.error) end end |