Class: Tasker::Telemetry::ExportCoordinator
- Inherits:
-
Object
- Object
- Tasker::Telemetry::ExportCoordinator
- Includes:
- Singleton, Concerns::StructuredLogging
- Defined in:
- lib/tasker/telemetry/export_coordinator.rb
Overview
ExportCoordinator manages export coordination and integrates with PluginRegistry
This class now uses the unified PluginRegistry for plugin management, eliminating duplication and providing consistent plugin handling across the telemetry system.
Constant Summary
Constants included from Concerns::StructuredLogging
Concerns::StructuredLogging::CORRELATION_ID_KEY
Instance Method Summary collapse
-
#clear_plugins! ⇒ Boolean
Legacy method for backward compatibility (now delegates to PluginRegistry).
-
#coordinate_cache_sync(sync_result) ⇒ Object
Coordinate cache sync event.
-
#coordinate_export_completion(export_id, result) ⇒ Object
Coordinate export completion.
-
#coordinate_export_request(export_format, options = {}) ⇒ String
Coordinate export request.
-
#execute_coordinated_export(format:, include_instances: false, **options) ⇒ Hash
Execute coordinated export with distributed locking and plugin coordination.
-
#extend_cache_ttl(extension_duration) ⇒ Object
Extend cache TTL for distributed scenarios.
-
#initialize ⇒ ExportCoordinator
constructor
A new instance of ExportCoordinator.
-
#plugins_for_format(format) ⇒ Array<Object>
Get plugins that support a specific format.
-
#register_plugin(name, plugin, replace: false, **options) ⇒ Boolean
Register a plugin for export coordination (delegates to PluginRegistry).
-
#registered_plugins ⇒ Hash
Get registered plugins (delegates to PluginRegistry).
-
#stats ⇒ Hash
Get comprehensive export coordination statistics.
-
#supported_formats ⇒ Array<String>
Get all supported formats across registered plugins.
-
#supports_format? ⇒ Boolean
Check if a format is supported by any registered plugin.
-
#unregister_plugin(name) ⇒ Boolean
Unregister a plugin (delegates to PluginRegistry).
Methods included from Concerns::StructuredLogging
#correlation_id, #correlation_id=, #log_exception, #log_orchestration_event, #log_performance_event, #log_step_event, #log_structured, #log_task_event, #with_correlation_id
Constructor Details
#initialize ⇒ ExportCoordinator
Returns a new instance of ExportCoordinator.
18 19 20 21 22 23 24 25 26 |
# File 'lib/tasker/telemetry/export_coordinator.rb', line 18 def initialize @plugin_registry = PluginRegistry.instance @event_bus = Tasker::Events::Publisher.instance @mutex = Mutex.new log_structured(:info, 'ExportCoordinator initialized', entity_type: 'export_coordinator', event_type: :initialized) end |
Instance Method Details
#clear_plugins! ⇒ Boolean
Legacy method for backward compatibility (now delegates to PluginRegistry)
341 342 343 |
# File 'lib/tasker/telemetry/export_coordinator.rb', line 341 def clear_plugins! @plugin_registry.clear! end |
#coordinate_cache_sync(sync_result) ⇒ Object
Coordinate cache sync event
117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/tasker/telemetry/export_coordinator.rb', line 117 def coordinate_cache_sync(sync_result) publish_event(Events::ExportEvents::CACHE_SYNCED, { strategy: sync_result[:strategy], metrics_count: sync_result[:metrics_count], duration_ms: sync_result[:duration_ms], success: sync_result[:success], timestamp: Time.current.iso8601 }) # Notify plugins of cache sync notify_plugins(:on_cache_sync, sync_result) end |
#coordinate_export_completion(export_id, result) ⇒ Object
Coordinate export completion
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/tasker/telemetry/export_coordinator.rb', line 159 def coordinate_export_completion(export_id, result) if result[:success] publish_event(Events::ExportEvents::EXPORT_COMPLETED, { export_id: export_id, format: result[:format], metrics_count: result[:metrics_count], duration_ms: result[:duration_ms], timestamp: Time.current.iso8601 }) else publish_event(Events::ExportEvents::EXPORT_FAILED, { export_id: export_id, format: result[:format], error: result[:error], timestamp: Time.current.iso8601 }) end # Notify plugins of export completion notify_plugins(:on_export_complete, { export_id: export_id, result: result }) end |
#coordinate_export_request(export_format, options = {}) ⇒ String
Coordinate export request
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/tasker/telemetry/export_coordinator.rb', line 135 def coordinate_export_request(export_format, = {}) export_id = SecureRandom.uuid publish_event(Events::ExportEvents::EXPORT_REQUESTED, { export_id: export_id, format: export_format, options: , timestamp: Time.current.iso8601 }) # Notify plugins of export request notify_plugins(:on_export_request, { export_id: export_id, format: export_format, options: }) export_id end |
#execute_coordinated_export(format:, include_instances: false, **options) ⇒ Hash
Execute coordinated export with distributed locking and plugin coordination
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 |
# File 'lib/tasker/telemetry/export_coordinator.rb', line 190 def execute_coordinated_export(format:, include_instances: false, **) correlation_id = SecureRandom.uuid start_time = Time.current log_structured( :info, 'Starting coordinated export', entity_type: 'export_coordination', entity_id: correlation_id, format: format, include_instances: include_instances, options: ) begin # Check if format is supported unless supports_format?(format) return { success: false, error: "Unsupported export format: #{format}", supported_formats: supported_formats } end # Publish export requested event @event_bus.publish( Tasker::Telemetry::Events::ExportEvents::EXPORT_REQUESTED, { correlation_id: correlation_id, format: format, include_instances: include_instances, options: , timestamp: start_time } ) # Get metrics data from backend metrics_backend = Tasker::Telemetry::MetricsBackend.instance metrics_data = if include_instances metrics_backend.export_distributed_metrics else metrics_backend.export_metrics end # Execute plugin-based export using PluginRegistry export_result = coordinate_plugin_export(format, metrics_data, correlation_id) duration_ms = ((Time.current - start_time) * 1000).round(2) if export_result[:success] result = { success: true, format: format, data: export_result[:data], metrics_count: metrics_data.size, duration_ms: duration_ms, correlation_id: correlation_id } @event_bus.publish( Tasker::Telemetry::Events::ExportEvents::EXPORT_COMPLETED, result.merge(timestamp: Time.current) ) log_structured( :info, 'Coordinated export completed successfully', entity_type: 'export_coordination', entity_id: correlation_id, format: format, metrics_count: result[:metrics_count], duration_ms: duration_ms ) else result = { success: false, format: format, error: export_result[:error], duration_ms: duration_ms, correlation_id: correlation_id } @event_bus.publish( Tasker::Telemetry::Events::ExportEvents::EXPORT_FAILED, result.merge(timestamp: Time.current) ) log_structured( :error, 'Coordinated export failed', entity_type: 'export_coordination', entity_id: correlation_id, format: format, error: export_result[:error], duration_ms: duration_ms ) end result rescue StandardError => e duration_ms = ((Time.current - start_time) * 1000).round(2) result = { success: false, format: format, error: e., duration_ms: duration_ms, correlation_id: correlation_id } @event_bus.publish( Tasker::Telemetry::Events::ExportEvents::EXPORT_FAILED, result.merge(timestamp: Time.current) ) log_structured( :error, 'Coordinated export exception', entity_type: 'export_coordination', entity_id: correlation_id, format: format, error: e., error_class: e.class.name, duration_ms: duration_ms ) result end end |
#extend_cache_ttl(extension_duration) ⇒ Object
Extend cache TTL for distributed scenarios
348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 |
# File 'lib/tasker/telemetry/export_coordinator.rb', line 348 def extend_cache_ttl(extension_duration) correlation_id = SecureRandom.uuid log_structured( :info, 'Extending cache TTL for distributed export', entity_type: 'cache_coordination', entity_id: correlation_id, extension_duration: extension_duration ) begin # Get cache backend and extend TTL cache_backend = Tasker::Telemetry::CacheBackend.instance result = cache_backend.extend_ttl(extension_duration) if result[:success] @event_bus.publish( Tasker::Telemetry::Events::ExportEvents::CACHE_TTL_EXTENDED, { correlation_id: correlation_id, extension_duration: extension_duration, new_ttl: result[:new_ttl], timestamp: Time.current } ) log_structured( :info, 'Cache TTL extended successfully', entity_type: 'cache_coordination', entity_id: correlation_id, extension_duration: extension_duration, new_ttl: result[:new_ttl] ) else log_structured( :warn, 'Failed to extend cache TTL', entity_type: 'cache_coordination', entity_id: correlation_id, extension_duration: extension_duration, error: result[:error] ) end result rescue StandardError => e log_structured( :error, 'Cache TTL extension exception', entity_type: 'cache_coordination', entity_id: correlation_id, extension_duration: extension_duration, error: e., error_class: e.class.name ) { success: false, error: e. } end end |
#plugins_for_format(format) ⇒ Array<Object>
Get plugins that support a specific format
99 100 101 |
# File 'lib/tasker/telemetry/export_coordinator.rb', line 99 def plugins_for_format(format) @plugin_registry.find_by(format: format) end |
#register_plugin(name, plugin, replace: false, **options) ⇒ Boolean
Register a plugin for export coordination (delegates to PluginRegistry)
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/tasker/telemetry/export_coordinator.rb', line 35 def register_plugin(name, plugin, replace: false, **) # Register with the unified PluginRegistry result = @plugin_registry.register(name, plugin, replace: replace, **) if result # Publish coordination event publish_event(Events::ExportEvents::PLUGIN_REGISTERED, { plugin_name: name.to_s, plugin_class: plugin.class.name, options: .merge(replace: replace), timestamp: Time.current.iso8601 }) log_structured(:info, 'Export plugin registered via coordinator', entity_type: 'export_plugin', entity_id: name.to_s, plugin_name: name.to_s, plugin_class: plugin.class.name, options: .merge(replace: replace), event_type: :registered) end result end |
#registered_plugins ⇒ Hash
Get registered plugins (delegates to PluginRegistry)
91 92 93 |
# File 'lib/tasker/telemetry/export_coordinator.rb', line 91 def registered_plugins @plugin_registry.all_plugins end |
#stats ⇒ Hash
Get comprehensive export coordination statistics
323 324 325 326 327 328 329 330 331 332 333 334 335 336 |
# File 'lib/tasker/telemetry/export_coordinator.rb', line 323 def stats plugin_stats = @plugin_registry.stats { export_coordinator: { initialized_at: @initialized_at || Time.current, total_plugins: plugin_stats[:total_plugins], supported_formats: plugin_stats[:supported_formats], plugins_by_format: plugin_stats[:plugins_by_format], average_formats_per_plugin: plugin_stats[:average_formats_per_plugin] }, plugin_registry_stats: plugin_stats } end |
#supported_formats ⇒ Array<String>
Get all supported formats across registered plugins
112 |
# File 'lib/tasker/telemetry/export_coordinator.rb', line 112 delegate :supported_formats, to: :@plugin_registry |
#supports_format? ⇒ Boolean
Check if a format is supported by any registered plugin
107 |
# File 'lib/tasker/telemetry/export_coordinator.rb', line 107 delegate :supports_format?, to: :@plugin_registry |
#unregister_plugin(name) ⇒ Boolean
Unregister a plugin (delegates to PluginRegistry)
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/tasker/telemetry/export_coordinator.rb', line 64 def unregister_plugin(name) # Get plugin info before unregistering plugin_info = @plugin_registry.get_plugin(name.to_s) result = @plugin_registry.unregister(name) if result && plugin_info publish_event(Events::ExportEvents::PLUGIN_UNREGISTERED, { plugin_name: name.to_s, plugin_class: plugin_info.class.name, timestamp: Time.current.iso8601 }) log_structured(:info, 'Export plugin unregistered via coordinator', entity_type: 'export_plugin', entity_id: name.to_s, plugin_name: name.to_s, plugin_class: plugin_info.class.name, event_type: :unregistered) end result end |