Class: Tasker::Telemetry::ExportCoordinator

Inherits:
Object
  • Object
show all
Includes:
Singleton, Concerns::StructuredLogging
Defined in:
lib/tasker/telemetry/export_coordinator.rb

Overview

ExportCoordinator manages export coordination and integrates with PluginRegistry

This class now uses the unified PluginRegistry for plugin management, eliminating duplication and providing consistent plugin handling across the telemetry system.

Constant Summary

Constants included from Concerns::StructuredLogging

Concerns::StructuredLogging::CORRELATION_ID_KEY

Instance Method Summary collapse

Methods included from Concerns::StructuredLogging

#correlation_id, #correlation_id=, #log_exception, #log_orchestration_event, #log_performance_event, #log_step_event, #log_structured, #log_task_event, #with_correlation_id

Constructor Details

#initializeExportCoordinator

Returns a new instance of ExportCoordinator.



18
19
20
21
22
23
24
25
26
# File 'lib/tasker/telemetry/export_coordinator.rb', line 18

def initialize
  @plugin_registry = PluginRegistry.instance
  @event_bus = Tasker::Events::Publisher.instance
  @mutex = Mutex.new

  log_structured(:info, 'ExportCoordinator initialized',
                 entity_type: 'export_coordinator',
                 event_type: :initialized)
end

Instance Method Details

#clear_plugins!Boolean

Legacy method for backward compatibility (now delegates to PluginRegistry)

Returns:

  • (Boolean)

    True if cleared successfully



341
342
343
# File 'lib/tasker/telemetry/export_coordinator.rb', line 341

def clear_plugins!
  @plugin_registry.clear!
end

#coordinate_cache_sync(sync_result) ⇒ Object

Coordinate cache sync event

Parameters:

  • sync_result (Hash)

    Result from cache sync operation



117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/tasker/telemetry/export_coordinator.rb', line 117

def coordinate_cache_sync(sync_result)
  publish_event(Events::ExportEvents::CACHE_SYNCED, {
                  strategy: sync_result[:strategy],
                  metrics_count: sync_result[:metrics_count],
                  duration_ms: sync_result[:duration_ms],
                  success: sync_result[:success],
                  timestamp: Time.current.iso8601
                })

  # Notify plugins of cache sync
  notify_plugins(:on_cache_sync, sync_result)
end

#coordinate_export_completion(export_id, result) ⇒ Object

Coordinate export completion

Parameters:

  • export_id (String)

    Export identifier

  • result (Hash)

    Export result



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/tasker/telemetry/export_coordinator.rb', line 159

def coordinate_export_completion(export_id, result)
  if result[:success]
    publish_event(Events::ExportEvents::EXPORT_COMPLETED, {
                    export_id: export_id,
                    format: result[:format],
                    metrics_count: result[:metrics_count],
                    duration_ms: result[:duration_ms],
                    timestamp: Time.current.iso8601
                  })
  else
    publish_event(Events::ExportEvents::EXPORT_FAILED, {
                    export_id: export_id,
                    format: result[:format],
                    error: result[:error],
                    timestamp: Time.current.iso8601
                  })
  end

  # Notify plugins of export completion
  notify_plugins(:on_export_complete, {
                   export_id: export_id,
                   result: result
                 })
end

#coordinate_export_request(export_format, options = {}) ⇒ String

Coordinate export request

Parameters:

  • export_format (String)

    Requested export format

  • options (Hash) (defaults to: {})

    Export options

Returns:

  • (String)

    Export ID



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/tasker/telemetry/export_coordinator.rb', line 135

def coordinate_export_request(export_format, options = {})
  export_id = SecureRandom.uuid

  publish_event(Events::ExportEvents::EXPORT_REQUESTED, {
                  export_id: export_id,
                  format: export_format,
                  options: options,
                  timestamp: Time.current.iso8601
                })

  # Notify plugins of export request
  notify_plugins(:on_export_request, {
                   export_id: export_id,
                   format: export_format,
                   options: options
                 })

  export_id
end

#execute_coordinated_export(format:, include_instances: false, **options) ⇒ Hash

Execute coordinated export with distributed locking and plugin coordination

Parameters:

  • format (Symbol)

    Export format (e.g., :prometheus, :json, :csv)

  • include_instances (Boolean) (defaults to: false)

    Whether to include instance-specific data

  • options (Hash)

    Additional export options

Returns:

  • (Hash)

    Export result with success status and data or error



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/tasker/telemetry/export_coordinator.rb', line 190

def execute_coordinated_export(format:, include_instances: false, **options)
  correlation_id = SecureRandom.uuid
  start_time = Time.current

  log_structured(
    :info,
    'Starting coordinated export',
    entity_type: 'export_coordination',
    entity_id: correlation_id,
    format: format,
    include_instances: include_instances,
    options: options
  )

  begin
    # Check if format is supported
    unless supports_format?(format)
      return {
        success: false,
        error: "Unsupported export format: #{format}",
        supported_formats: supported_formats
      }
    end

    # Publish export requested event
    @event_bus.publish(
      Tasker::Telemetry::Events::ExportEvents::EXPORT_REQUESTED,
      {
        correlation_id: correlation_id,
        format: format,
        include_instances: include_instances,
        options: options,
        timestamp: start_time
      }
    )

    # Get metrics data from backend
    metrics_backend = Tasker::Telemetry::MetricsBackend.instance
    metrics_data = if include_instances
                     metrics_backend.export_distributed_metrics
                   else
                     metrics_backend.export_metrics
                   end

    # Execute plugin-based export using PluginRegistry
    export_result = coordinate_plugin_export(format, metrics_data, correlation_id)

    duration_ms = ((Time.current - start_time) * 1000).round(2)

    if export_result[:success]
      result = {
        success: true,
        format: format,
        data: export_result[:data],
        metrics_count: metrics_data.size,
        duration_ms: duration_ms,
        correlation_id: correlation_id
      }

      @event_bus.publish(
        Tasker::Telemetry::Events::ExportEvents::EXPORT_COMPLETED,
        result.merge(timestamp: Time.current)
      )

      log_structured(
        :info,
        'Coordinated export completed successfully',
        entity_type: 'export_coordination',
        entity_id: correlation_id,
        format: format,
        metrics_count: result[:metrics_count],
        duration_ms: duration_ms
      )
    else
      result = {
        success: false,
        format: format,
        error: export_result[:error],
        duration_ms: duration_ms,
        correlation_id: correlation_id
      }

      @event_bus.publish(
        Tasker::Telemetry::Events::ExportEvents::EXPORT_FAILED,
        result.merge(timestamp: Time.current)
      )

      log_structured(
        :error,
        'Coordinated export failed',
        entity_type: 'export_coordination',
        entity_id: correlation_id,
        format: format,
        error: export_result[:error],
        duration_ms: duration_ms
      )
    end

    result
  rescue StandardError => e
    duration_ms = ((Time.current - start_time) * 1000).round(2)

    result = {
      success: false,
      format: format,
      error: e.message,
      duration_ms: duration_ms,
      correlation_id: correlation_id
    }

    @event_bus.publish(
      Tasker::Telemetry::Events::ExportEvents::EXPORT_FAILED,
      result.merge(timestamp: Time.current)
    )

    log_structured(
      :error,
      'Coordinated export exception',
      entity_type: 'export_coordination',
      entity_id: correlation_id,
      format: format,
      error: e.message,
      error_class: e.class.name,
      duration_ms: duration_ms
    )

    result
  end
end

#extend_cache_ttl(extension_duration) ⇒ Object

Extend cache TTL for distributed scenarios

Parameters:

  • extension_duration (Integer)

    Extension duration in seconds



348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
# File 'lib/tasker/telemetry/export_coordinator.rb', line 348

def extend_cache_ttl(extension_duration)
  correlation_id = SecureRandom.uuid

  log_structured(
    :info,
    'Extending cache TTL for distributed export',
    entity_type: 'cache_coordination',
    entity_id: correlation_id,
    extension_duration: extension_duration
  )

  begin
    # Get cache backend and extend TTL
    cache_backend = Tasker::Telemetry::CacheBackend.instance
    result = cache_backend.extend_ttl(extension_duration)

    if result[:success]
      @event_bus.publish(
        Tasker::Telemetry::Events::ExportEvents::CACHE_TTL_EXTENDED,
        {
          correlation_id: correlation_id,
          extension_duration: extension_duration,
          new_ttl: result[:new_ttl],
          timestamp: Time.current
        }
      )

      log_structured(
        :info,
        'Cache TTL extended successfully',
        entity_type: 'cache_coordination',
        entity_id: correlation_id,
        extension_duration: extension_duration,
        new_ttl: result[:new_ttl]
      )
    else
      log_structured(
        :warn,
        'Failed to extend cache TTL',
        entity_type: 'cache_coordination',
        entity_id: correlation_id,
        extension_duration: extension_duration,
        error: result[:error]
      )
    end

    result
  rescue StandardError => e
    log_structured(
      :error,
      'Cache TTL extension exception',
      entity_type: 'cache_coordination',
      entity_id: correlation_id,
      extension_duration: extension_duration,
      error: e.message,
      error_class: e.class.name
    )

    { success: false, error: e.message }
  end
end

#plugins_for_format(format) ⇒ Array<Object>

Get plugins that support a specific format

Parameters:

  • format (String, Symbol)

    Format to search for

Returns:

  • (Array<Object>)

    Array of plugin instances



99
100
101
# File 'lib/tasker/telemetry/export_coordinator.rb', line 99

def plugins_for_format(format)
  @plugin_registry.find_by(format: format)
end

#register_plugin(name, plugin, replace: false, **options) ⇒ Boolean

Register a plugin for export coordination (delegates to PluginRegistry)

Parameters:

  • name (String, Symbol)

    Plugin identifier

  • plugin (Object)

    Plugin instance implementing required interface

  • replace (Boolean) (defaults to: false)

    Whether to replace existing plugin

  • options (Hash)

    Plugin configuration options

Returns:

  • (Boolean)

    True if registration successful



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/tasker/telemetry/export_coordinator.rb', line 35

def register_plugin(name, plugin, replace: false, **options)
  # Register with the unified PluginRegistry
  result = @plugin_registry.register(name, plugin, replace: replace, **options)

  if result
    # Publish coordination event
    publish_event(Events::ExportEvents::PLUGIN_REGISTERED, {
                    plugin_name: name.to_s,
                    plugin_class: plugin.class.name,
                    options: options.merge(replace: replace),
                    timestamp: Time.current.iso8601
                  })

    log_structured(:info, 'Export plugin registered via coordinator',
                   entity_type: 'export_plugin',
                   entity_id: name.to_s,
                   plugin_name: name.to_s,
                   plugin_class: plugin.class.name,
                   options: options.merge(replace: replace),
                   event_type: :registered)
  end

  result
end

#registered_pluginsHash

Get registered plugins (delegates to PluginRegistry)

Returns:

  • (Hash)

    Registered plugins



91
92
93
# File 'lib/tasker/telemetry/export_coordinator.rb', line 91

def registered_plugins
  @plugin_registry.all_plugins
end

#statsHash

Get comprehensive export coordination statistics

Returns:

  • (Hash)

    Detailed statistics about export coordination



323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/tasker/telemetry/export_coordinator.rb', line 323

def stats
  plugin_stats = @plugin_registry.stats

  {
    export_coordinator: {
      initialized_at: @initialized_at || Time.current,
      total_plugins: plugin_stats[:total_plugins],
      supported_formats: plugin_stats[:supported_formats],
      plugins_by_format: plugin_stats[:plugins_by_format],
      average_formats_per_plugin: plugin_stats[:average_formats_per_plugin]
    },
    plugin_registry_stats: plugin_stats
  }
end

#supported_formatsArray<String>

Get all supported formats across registered plugins

Returns:

  • (Array<String>)

    Array of supported format names



112
# File 'lib/tasker/telemetry/export_coordinator.rb', line 112

delegate :supported_formats, to: :@plugin_registry

#supports_format?Boolean

Check if a format is supported by any registered plugin

Parameters:

  • format (String, Symbol)

    Format to check

Returns:

  • (Boolean)

    True if format is supported



107
# File 'lib/tasker/telemetry/export_coordinator.rb', line 107

delegate :supports_format?, to: :@plugin_registry

#unregister_plugin(name) ⇒ Boolean

Unregister a plugin (delegates to PluginRegistry)

Parameters:

  • name (String, Symbol)

    Plugin identifier

Returns:

  • (Boolean)

    True if unregistered successfully



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/tasker/telemetry/export_coordinator.rb', line 64

def unregister_plugin(name)
  # Get plugin info before unregistering
  plugin_info = @plugin_registry.get_plugin(name.to_s)

  result = @plugin_registry.unregister(name)

  if result && plugin_info
    publish_event(Events::ExportEvents::PLUGIN_UNREGISTERED, {
                    plugin_name: name.to_s,
                    plugin_class: plugin_info.class.name,
                    timestamp: Time.current.iso8601
                  })

    log_structured(:info, 'Export plugin unregistered via coordinator',
                   entity_type: 'export_plugin',
                   entity_id: name.to_s,
                   plugin_name: name.to_s,
                   plugin_class: plugin_info.class.name,
                   event_type: :unregistered)
  end

  result
end