Class: LaunchDarkly::Impl::Integrations::TestData::TestDataSourceV2 Private
- Inherits:
-
Object
- Object
- LaunchDarkly::Impl::Integrations::TestData::TestDataSourceV2
- Includes:
- LaunchDarkly::Interfaces::DataSystem::Initializer, LaunchDarkly::Interfaces::DataSystem::Synchronizer
- Defined in:
- lib/ldclient-rb/impl/integrations/test_data/test_data_source_v2.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Internal implementation of both Initializer and Synchronizer protocols for TestDataV2.
This component bridges the test data management in TestDataV2 with the FDv2 protocol interfaces. Each instance implements both Initializer and Synchronizer protocols and receives change notifications for dynamic updates.
Instance Method Summary collapse
-
#fetch(selector_store) ⇒ LaunchDarkly::Result
private
Implementation of the Initializer.fetch method.
-
#initialize(test_data) ⇒ TestDataSourceV2
constructor
private
A new instance of TestDataSourceV2.
-
#name ⇒ String
private
Return the name of this data source.
-
#stop ⇒ void
private
Stop the data source and clean up resources.
-
#sync(selector_store) {|LaunchDarkly::Interfaces::DataSystem::Update| ... } ⇒ void
private
Implementation of the Synchronizer.sync method.
-
#upsert_flag(flag_data) ⇒ void
private
Called by TestDataV2 when a flag is updated.
-
#upsert_segment(segment_data) ⇒ void
private
Called by TestDataV2 when a segment is updated.
Constructor Details
#initialize(test_data) ⇒ TestDataSourceV2
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of TestDataSourceV2.
26 27 28 29 30 31 32 33 34 |
# File 'lib/ldclient-rb/impl/integrations/test_data/test_data_source_v2.rb', line 26 def initialize(test_data) @test_data = test_data @closed = false @update_queue = Queue.new @lock = Mutex.new # Always register for change notifications @test_data.add_instance(self) end |
Instance Method Details
#fetch(selector_store) ⇒ LaunchDarkly::Result
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Implementation of the Initializer.fetch method.
Returns the current test data as a Basis for initial data loading.
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/ldclient-rb/impl/integrations/test_data/test_data_source_v2.rb', line 53 def fetch(selector_store) begin @lock.synchronize do if @closed return LaunchDarkly::Result.fail('TestDataV2 source has been closed') end # Get all current flags and segments from test data init_data = @test_data.make_init_data version = @test_data.get_version # Build a full transfer changeset builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) # Add all flags to the changeset init_data[:flags].each do |key, flag_data| builder.add_put( LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, key, flag_data[:version] || 1, flag_data ) end # Add all segments to the changeset init_data[:segments].each do |key, segment_data| builder.add_put( LaunchDarkly::Interfaces::DataSystem::ObjectKind::SEGMENT, key, segment_data[:version] || 1, segment_data ) end # Create selector for this version selector = LaunchDarkly::Interfaces::DataSystem::Selector.new_selector(version.to_s, version) change_set = builder.finish(selector) basis = LaunchDarkly::Interfaces::DataSystem::Basis.new(change_set: change_set, persist: false, environment_id: nil) LaunchDarkly::Result.success(basis) end rescue => e LaunchDarkly::Result.fail("Error fetching test data: #{e.message}", e) end end |
#name ⇒ String
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Return the name of this data source.
41 42 43 |
# File 'lib/ldclient-rb/impl/integrations/test_data/test_data_source_v2.rb', line 41 def name 'TestDataV2' end |
#stop ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Stop the data source and clean up resources
163 164 165 166 167 168 169 170 171 172 |
# File 'lib/ldclient-rb/impl/integrations/test_data/test_data_source_v2.rb', line 163 def stop @lock.synchronize do return if @closed @closed = true end @test_data.closed_instance(self) # Signal shutdown to sync generator @update_queue.push(nil) end |
#sync(selector_store) {|LaunchDarkly::Interfaces::DataSystem::Update| ... } ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Implementation of the Synchronizer.sync method.
Yields updates as test data changes occur.
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/ldclient-rb/impl/integrations/test_data/test_data_source_v2.rb', line 110 def sync(selector_store) # First yield initial data initial_result = fetch(selector_store) unless initial_result.success? yield LaunchDarkly::Interfaces::DataSystem::Update.new( state: LaunchDarkly::Interfaces::DataSource::Status::OFF, error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( LaunchDarkly::Interfaces::DataSource::ErrorInfo::STORE_ERROR, 0, initial_result.error, Time.now ) ) return end # Yield the initial successful state yield LaunchDarkly::Interfaces::DataSystem::Update.new( state: LaunchDarkly::Interfaces::DataSource::Status::VALID, change_set: initial_result.value.change_set ) # Continue yielding updates as they arrive until @closed begin # stop() will push nil to the queue to wake us up when shutting down update = @update_queue.pop # Handle nil sentinel for shutdown break if update.nil? # Yield the actual update yield update rescue => e yield LaunchDarkly::Interfaces::DataSystem::Update.new( state: LaunchDarkly::Interfaces::DataSource::Status::OFF, error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( LaunchDarkly::Interfaces::DataSource::ErrorInfo::UNKNOWN, 0, "Error in test data synchronizer: #{e.message}", Time.now ) ) break end end end |
#upsert_flag(flag_data) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Called by TestDataV2 when a flag is updated.
This method converts the flag update into an FDv2 changeset and queues it for delivery through the sync() generator.
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/ldclient-rb/impl/integrations/test_data/test_data_source_v2.rb', line 183 def upsert_flag(flag_data) @lock.synchronize do return if @closed begin version = @test_data.get_version # Build a changes transfer changeset builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_CHANGES) # Add the updated flag builder.add_put( LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, flag_data[:key], flag_data[:version] || 1, flag_data ) # Create selector for this version selector = LaunchDarkly::Interfaces::DataSystem::Selector.new_selector(version.to_s, version) change_set = builder.finish(selector) # Queue the update update = LaunchDarkly::Interfaces::DataSystem::Update.new( state: LaunchDarkly::Interfaces::DataSource::Status::VALID, change_set: change_set ) @update_queue.push(update) rescue => e # Queue an error update error_update = LaunchDarkly::Interfaces::DataSystem::Update.new( state: LaunchDarkly::Interfaces::DataSource::Status::OFF, error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( LaunchDarkly::Interfaces::DataSource::ErrorInfo::STORE_ERROR, 0, "Error processing flag update: #{e.message}", Time.now ) ) @update_queue.push(error_update) end end end |
#upsert_segment(segment_data) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Called by TestDataV2 when a segment is updated.
This method converts the segment update into an FDv2 changeset and queues it for delivery through the sync() generator.
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 |
# File 'lib/ldclient-rb/impl/integrations/test_data/test_data_source_v2.rb', line 238 def upsert_segment(segment_data) @lock.synchronize do return if @closed begin version = @test_data.get_version # Build a changes transfer changeset builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_CHANGES) # Add the updated segment builder.add_put( LaunchDarkly::Interfaces::DataSystem::ObjectKind::SEGMENT, segment_data[:key], segment_data[:version] || 1, segment_data ) # Create selector for this version selector = LaunchDarkly::Interfaces::DataSystem::Selector.new_selector(version.to_s, version) change_set = builder.finish(selector) # Queue the update update = LaunchDarkly::Interfaces::DataSystem::Update.new( state: LaunchDarkly::Interfaces::DataSource::Status::VALID, change_set: change_set ) @update_queue.push(update) rescue => e # Queue an error update error_update = LaunchDarkly::Interfaces::DataSystem::Update.new( state: LaunchDarkly::Interfaces::DataSource::Status::OFF, error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( LaunchDarkly::Interfaces::DataSource::ErrorInfo::STORE_ERROR, 0, "Error processing segment update: #{e.message}", Time.now ) ) @update_queue.push(error_update) end end end |