Class: NATS::JetStream::API::ConsumerInfo
- Inherits:
-
Struct
- Object
- Struct
- NATS::JetStream::API::ConsumerInfo
- Defined in:
- lib/nats/io/jetstream/api.rb
Overview
ConsumerInfo is the current status of a JetStream consumer.
Instance Attribute Summary collapse
- #ack_floor ⇒ SequenceInfo
- #cluster ⇒ Hash
-
#config ⇒ ConsumerConfig
Consumer configuration.
-
#created ⇒ String
Time when the consumer was created.
- #delivered ⇒ SequenceInfo
-
#name ⇒ String
Name of the consumer.
- #num_ack_pending ⇒ Integer
- #num_pending ⇒ Integer
- #num_redelivered ⇒ Integer
- #num_waiting ⇒ Integer
-
#push_bound ⇒ Object
Returns the value of attribute push_bound.
-
#stream_name ⇒ String
Name of the stream to which the consumer belongs.
-
#type ⇒ Object
Returns the value of attribute type.
Instance Method Summary collapse
-
#initialize(opts = {}) ⇒ ConsumerInfo
constructor
A new instance of ConsumerInfo.
Constructor Details
#initialize(opts = {}) ⇒ ConsumerInfo
Returns a new instance of ConsumerInfo.
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/nats/io/jetstream/api.rb', line 73 def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end |
Instance Attribute Details
#ack_floor ⇒ SequenceInfo
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/nats/io/jetstream/api.rb', line 68 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#cluster ⇒ Hash
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/nats/io/jetstream/api.rb', line 68 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#config ⇒ ConsumerConfig
Returns consumer configuration.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/nats/io/jetstream/api.rb', line 68 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#created ⇒ String
Returns time when the consumer was created.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/nats/io/jetstream/api.rb', line 68 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#delivered ⇒ SequenceInfo
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/nats/io/jetstream/api.rb', line 68 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#name ⇒ String
Returns name of the consumer.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/nats/io/jetstream/api.rb', line 68 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#num_ack_pending ⇒ Integer
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/nats/io/jetstream/api.rb', line 68 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#num_pending ⇒ Integer
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/nats/io/jetstream/api.rb', line 68 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#num_redelivered ⇒ Integer
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/nats/io/jetstream/api.rb', line 68 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#num_waiting ⇒ Integer
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/nats/io/jetstream/api.rb', line 68 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#push_bound ⇒ Object
Returns the value of attribute push_bound
68 69 70 |
# File 'lib/nats/io/jetstream/api.rb', line 68 def push_bound @push_bound end |
#stream_name ⇒ String
Returns name of the stream to which the consumer belongs.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/nats/io/jetstream/api.rb', line 68 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts = {}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config][:inactive_threshold] = opts[:config][:inactive_threshold] / ::NATS::NANOSECONDS if opts[:config][:inactive_threshold] opts[:config][:idle_heartbeat] = opts[:config][:idle_heartbeat] / ::NATS::NANOSECONDS if opts[:config][:idle_heartbeat] opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super freeze end end |
#type ⇒ Object
Returns the value of attribute type
68 69 70 |
# File 'lib/nats/io/jetstream/api.rb', line 68 def type @type end |