Class: Mongo::Collection::View::ChangeStream
- Inherits:
-
Aggregation
- Object
- Aggregation
- Mongo::Collection::View::ChangeStream
- Includes:
- Retryable
- Defined in:
- lib/mongo/collection/view/change_stream.rb,
lib/mongo/collection/view/change_stream/retryable.rb
Overview
Only available in server versions 3.6 and higher.
ChangeStreams do not work properly with JRuby because of the issue documented here: github.com/jruby/jruby/issues/4212. Namely, JRuby eagerly evaluates #next on an Enumerator in a background green thread, therefore calling #next on the change stream will cause getMores to be called in a loop in the background.
Provides behaviour around a ‘$changeStream` pipeline stage in the aggregation framework. Specifying this stage allows users to request that notifications are sent for all changes to a particular collection or database.
Defined Under Namespace
Modules: Retryable
Constant Summary collapse
- FULL_DOCUMENT_DEFAULT =
Returns The fullDocument option default value.
'default'.freeze
- DATABASE =
Returns Used to indicate that the change stream should listen for changes on the entire database rather than just the collection.
:database- CLUSTER =
Returns Used to indicate that the change stream should listen for changes on the entire cluster rather than just the collection.
:cluster
Constants inherited from Aggregation
Constants included from Loggable
Constants included from Explainable
Explainable::ALL_PLANS_EXECUTION, Explainable::EXECUTION_STATS, Explainable::QUERY_PLANNER
Instance Attribute Summary collapse
-
#options ⇒ BSON::Document
readonly
The change stream options.
Attributes inherited from Aggregation
Instance Method Summary collapse
-
#close ⇒ nil
Close the change stream.
-
#closed? ⇒ true, false
Is the change stream closed?.
-
#each {|Each| ... } ⇒ Enumerator
Iterate through documents returned by the change stream.
-
#initialize(view, pipeline, changes_for, options = {}) ⇒ ChangeStream
constructor
Initialize the change stream for the provided collection view, pipeline and options.
-
#inspect ⇒ String
Get a formatted string for use in inspection.
- #to_enum ⇒ Object
-
#try_next ⇒ BSON::Document | nil
private
Return one document from the change stream, if one is available.
Methods inherited from Aggregation
Methods included from Retryable
#read_with_one_retry, #read_with_retry, #write_with_retry
Methods included from Loggable
#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger
Methods included from Explainable
Methods included from Iterable
Constructor Details
#initialize(view, pipeline, changes_for, options = {}) ⇒ ChangeStream
Initialize the change stream for the provided collection view, pipeline and options.
86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/mongo/collection/view/change_stream.rb', line 86 def initialize(view, pipeline, changes_for, = {}) @view = view @changes_for = changes_for @change_stream_filters = pipeline && pipeline.dup = && .dup.freeze @resume_token = [:resume_after] create_cursor! # We send different parameters when we resume a change stream # compared to when we send the first query @resuming = true end |
Instance Attribute Details
#options ⇒ BSON::Document (readonly)
Returns The change stream options.
58 59 60 |
# File 'lib/mongo/collection/view/change_stream.rb', line 58 def end |
Instance Method Details
#close ⇒ nil
Close the change stream.
203 204 205 206 207 208 |
# File 'lib/mongo/collection/view/change_stream.rb', line 203 def close unless closed? begin; @cursor.send(:kill_cursors); rescue; end @cursor = nil end end |
#closed? ⇒ true, false
Is the change stream closed?
218 219 220 |
# File 'lib/mongo/collection/view/change_stream.rb', line 218 def closed? @cursor.nil? end |
#each {|Each| ... } ⇒ Enumerator
Iterate through documents returned by the change stream.
This method retries once per error on resumable errors (two consecutive errors result in the second error being raised, an error which is recovered from resets the error count to zero).
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/mongo/collection/view/change_stream.rb', line 115 def each raise StopIteration.new if closed? retried = false begin @cursor.each do |doc| cache_resume_token(doc) yield doc end if block_given? @cursor.to_enum rescue Mongo::Error => e if retried || !e.change_stream_resumable? raise end retried = true # Rerun initial aggregation. # Any errors here will stop iteration and break out of this # method close create_cursor! retry end end |
#inspect ⇒ String
Get a formatted string for use in inspection.
230 231 232 233 |
# File 'lib/mongo/collection/view/change_stream.rb', line 230 def inspect "#<Mongo::Collection::View:ChangeStream:0x#{object_id} filters=#{@change_stream_filters} " + "options=#{@options} resume_token=#{@resume_token}>" end |
#to_enum ⇒ Object
184 185 186 187 188 189 190 191 192 193 |
# File 'lib/mongo/collection/view/change_stream.rb', line 184 def to_enum enum = super enum.send(:instance_variable_set, '@obj', self) class << enum def try_next @obj.try_next end end enum end |
#try_next ⇒ BSON::Document | nil
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method is experimental and subject to change.
Return one document from the change stream, if one is available.
Retries once on a resumable error.
Raises StopIteration if the change stream is closed.
This method will wait up to max_await_time_ms milliseconds for changes from the server, and if no changes are received it will return nil.
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/mongo/collection/view/change_stream.rb', line 153 def try_next raise StopIteration.new if closed? retried = false begin doc = @cursor.try_next rescue Mongo::Error => e unless e.change_stream_resumable? raise end if retried # Rerun initial aggregation. # Any errors here will stop iteration and break out of this # method close create_cursor! retried = false else # Attempt to retry a getMore once retried = true retry end end if doc cache_resume_token(doc) end doc end |