Class: Tasker::AnalyticsService

Inherits:
Object
  • Object
show all
Defined in:
app/services/tasker/analytics_service.rb

Overview

Service class for analytics calculations and data aggregation

This service encapsulates the complex analytics logic that was previously in the analytics controller, providing a clean separation of concerns. It handles performance metrics, bottleneck analysis, and data aggregation using both SQL functions and ActiveRecord scopes.

Defined Under Namespace

Classes: BottleneckAnalytics, PerformanceAnalytics

Class Method Summary collapse

Class Method Details

.analyze_dependency_bottlenecks(scope_params, since_time) ⇒ Hash

Analyze dependency bottlenecks using model scopes and SQL functions

Parameters:

  • scope_params (Hash)

    Scope parameters

  • since_time (Time)

    Analysis start time

Returns:

  • (Hash)

    Dependency bottleneck analysis



238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'app/services/tasker/analytics_service.rb', line 238

def self.analyze_dependency_bottlenecks(scope_params, since_time)
  scoped_query = build_scoped_query(scope_params, since_time)
  task_ids = scoped_query.pluck(:task_id)
  return default_dependency_bottlenecks if task_ids.empty?

  # Use existing scopes where possible
  pending_steps = WorkflowStep.joins(:named_step)
                              .where(task_id: task_ids)
                              .by_current_state('pending')

  blocking_count = pending_steps.joins(:incoming_edges).count
  avg_wait = pending_steps.where(tasker_workflow_steps: { created_at: ...5.minutes.ago })
                          .average('EXTRACT(EPOCH FROM (NOW() - tasker_workflow_steps.created_at))')

  {
    blocking_dependencies: blocking_count,
    avg_wait_time: avg_wait&.to_f&.round(1) || 0.0,
    most_blocked_steps: find_most_blocked_step_names(task_ids)
  }
rescue StandardError => e
  Rails.logger.error "Error in analyze_dependency_bottlenecks: #{e.message}"
  default_dependency_bottlenecks
end

.analyze_error_patterns(scope_params, since_time) ⇒ Hash

Analyze error patterns using model scopes

Parameters:

  • scope_params (Hash)

    Scope parameters

  • since_time (Time)

    Analysis start time

Returns:

  • (Hash)

    Error pattern analysis



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'app/services/tasker/analytics_service.rb', line 213

def self.analyze_error_patterns(scope_params, since_time)
  scoped_query = build_scoped_query(scope_params, since_time)

  total_tasks = scoped_query.count
  return default_error_pattern if total_tasks.zero?

  failed_tasks = scoped_query.failed_since(since_time).count
  error_rate = total_tasks.positive? ? (failed_tasks.to_f / total_tasks * 100).round(1) : 0.0

  {
    total_errors: failed_tasks,
    recent_error_rate: error_rate,
    common_error_types: %w[timeout validation network], # Static for now
    retry_success_rate: calculate_retry_success_rate(scoped_query)
  }
rescue StandardError => e
  Rails.logger.error "Error in analyze_error_patterns: #{e.message}"
  default_error_pattern
end

.build_scoped_query(scope_params, since_time) ⇒ ActiveRecord::Relation

Build scoped query using ActiveRecord scopes

Parameters:

  • scope_params (Hash)

    Scope parameters

  • since_time (Time)

    Time filter

Returns:

  • (ActiveRecord::Relation)

    Filtered query



328
329
330
331
332
333
334
# File 'app/services/tasker/analytics_service.rb', line 328

def self.build_scoped_query(scope_params, since_time)
  query = Task.created_since(since_time)
  query = query.in_namespace(scope_params[:namespace]) if scope_params[:namespace]
  query = query.with_task_name(scope_params[:name]) if scope_params[:name]
  query = query.with_version(scope_params[:version]) if scope_params[:version]
  query
end

.calculate_bottleneck_analytics(scope_params, period_hours) ⇒ BottleneckAnalytics

Calculate bottleneck analytics for specified scope and period using SQL functions

Parameters:

  • scope_params (Hash)

    Scope parameters (namespace, name, version)

  • period_hours (Integer)

    Analysis period in hours

Returns:



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'app/services/tasker/analytics_service.rb', line 109

def self.calculate_bottleneck_analytics(scope_params, period_hours)
  since_time = period_hours.hours.ago

  # Use SQL functions for efficient bottleneck analysis with fallbacks
  slowest_tasks = fetch_slowest_tasks(scope_params, since_time)
  slowest_steps = fetch_slowest_steps(scope_params, since_time)

  scope_summary = calculate_scope_summary(scope_params, period_hours)
  bottleneck_analysis = {
    slowest_tasks: slowest_tasks,
    slowest_steps: slowest_steps,
    error_patterns: analyze_error_patterns(scope_params, since_time),
    dependency_bottlenecks: analyze_dependency_bottlenecks(scope_params, since_time)
  }

  performance_distribution = calculate_performance_distribution(scope_params, since_time)
  recommendations = generate_recommendations(slowest_tasks, slowest_steps)

  BottleneckAnalytics.new(
    scope_summary: scope_summary,
    bottleneck_analysis: bottleneck_analysis,
    performance_distribution: performance_distribution,
    recommendations: recommendations,
    scope: scope_params,
    analysis_period_hours: period_hours
  )
end

.calculate_performance_analyticsPerformanceAnalytics

Calculate comprehensive performance analytics using SQL functions

Returns:



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'app/services/tasker/analytics_service.rb', line 64

def self.calculate_performance_analytics
  analysis_periods = {
    last_hour: 1.hour.ago,
    last_4_hours: 4.hours.ago,
    last_24_hours: 24.hours.ago
  }

  # Use the analytics metrics SQL function for efficient data retrieval
  base_metrics = Tasker::Functions::FunctionBasedAnalyticsMetrics.call

  system_overview = {
    active_tasks: base_metrics.active_tasks_count,
    total_namespaces: base_metrics.total_namespaces_count,
    unique_task_types: base_metrics.unique_task_types_count,
    system_health_score: base_metrics.system_health_score
  }

  performance_trends = {}
  # Calculate trends for each time period using SQL functions
  analysis_periods.each do |period_name, since_time|
    period_metrics = Tasker::Functions::FunctionBasedAnalyticsMetrics.call(since_time)
    performance_trends[period_name] = {
      task_throughput: period_metrics.task_throughput,
      completion_rate: period_metrics.completion_rate,
      error_rate: period_metrics.error_rate,
      avg_task_duration: period_metrics.avg_task_duration,
      avg_step_duration: period_metrics.avg_step_duration,
      step_throughput: period_metrics.step_throughput
    }
  end

  telemetry_insights = calculate_telemetry_insights

  PerformanceAnalytics.new(
    system_overview: system_overview,
    performance_trends: performance_trends,
    telemetry_insights: telemetry_insights
  )
end

.calculate_performance_distribution(scope_params, since_time) ⇒ Hash

Calculate performance distribution using model scopes

Parameters:

  • scope_params (Hash)

    Scope parameters

  • since_time (Time)

    Analysis start time

Returns:

  • (Hash)

    Performance distribution data



267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'app/services/tasker/analytics_service.rb', line 267

def self.calculate_performance_distribution(scope_params, since_time)
  scoped_query = build_scoped_query(scope_params, since_time)
  completed_tasks = scoped_query.completed_since(since_time)

  return default_performance_distribution if completed_tasks.empty?

  # Simple distribution calculation using model scopes
  total_completed = completed_tasks.count

  {
    percentiles: { p50: 15.0, p95: 45.0, p99: 75.0 }, # Simplified for now
    distribution_buckets: [
      { range: '0-10s', count: (total_completed * 0.6).round },
      { range: '10-30s', count: (total_completed * 0.3).round },
      { range: '30s+', count: (total_completed * 0.1).round }
    ]
  }
rescue StandardError => e
  Rails.logger.error "Error in calculate_performance_distribution: #{e.message}"
  default_performance_distribution
end

.calculate_retry_success_rate(scoped_query) ⇒ Float

Calculate retry success rate using model scopes

Parameters:

  • scoped_query (ActiveRecord::Relation)

    Scoped task query

Returns:

  • (Float)

    Retry success rate percentage



340
341
342
343
344
345
346
347
348
349
350
351
352
353
# File 'app/services/tasker/analytics_service.rb', line 340

def self.calculate_retry_success_rate(scoped_query)
  task_ids = scoped_query.pluck(:task_id)
  return 0.0 if task_ids.empty?

  retry_steps = WorkflowStep.where(task_id: task_ids).where('attempts > 1')
  total_retries = retry_steps.count
  return 0.0 if total_retries.zero?

  successful_retries = retry_steps.by_current_state('complete').count
  (successful_retries.to_f / total_retries * 100).round(1)
rescue StandardError => e
  Rails.logger.error "Error calculating retry success rate: #{e.message}"
  0.0
end

.calculate_scope_summary(scope_params, period_hours) ⇒ Hash

Calculate scope summary using SQL functions and scopes

Parameters:

  • scope_params (Hash)

    Scope parameters

  • period_hours (Integer)

    Analysis period

Returns:

  • (Hash)

    Scope summary data



194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'app/services/tasker/analytics_service.rb', line 194

def self.calculate_scope_summary(scope_params, period_hours)
  since_time = period_hours.hours.ago
  scoped_query = build_scoped_query(scope_params, since_time)

  {
    total_tasks: scoped_query.count,
    unique_task_types: scoped_query.joins(:named_task).distinct.count('tasker_named_tasks.name'),
    time_span_hours: period_hours.to_f
  }
rescue StandardError => e
  Rails.logger.error "Error in calculate_scope_summary: #{e.message}"
  { total_tasks: 0, unique_task_types: 0, time_span_hours: period_hours.to_f }
end

.calculate_telemetry_insightsHash

Calculate telemetry insights from trace and log backends

Returns:

  • (Hash)

    Telemetry insights



140
141
142
143
144
145
146
147
148
149
# File 'app/services/tasker/analytics_service.rb', line 140

def self.calculate_telemetry_insights
  trace_backend = Tasker::Telemetry::TraceBackend.instance
  log_backend = Tasker::Telemetry::LogBackend.instance

  {
    trace_stats: trace_backend.stats,
    log_stats: log_backend.stats,
    event_router_stats: Tasker::Telemetry::EventRouter.instance.routing_stats
  }
end

.default_dependency_bottlenecksObject



390
391
392
393
394
395
396
# File 'app/services/tasker/analytics_service.rb', line 390

def self.default_dependency_bottlenecks
  {
    blocking_dependencies: 0,
    avg_wait_time: 0.0,
    most_blocked_steps: []
  }
end

.default_error_patternObject

Default fallback methods for error conditions



381
382
383
384
385
386
387
388
# File 'app/services/tasker/analytics_service.rb', line 381

def self.default_error_pattern
  {
    total_errors: 0,
    recent_error_rate: 0.0,
    common_error_types: [],
    retry_success_rate: 0.0
  }
end

.default_performance_distributionObject



398
399
400
401
402
403
404
405
406
407
# File 'app/services/tasker/analytics_service.rb', line 398

def self.default_performance_distribution
  {
    percentiles: { p50: 0.0, p95: 0.0, p99: 0.0 },
    distribution_buckets: [
      { range: '0-10s', count: 0 },
      { range: '10-30s', count: 0 },
      { range: '30s+', count: 0 }
    ]
  }
end

.fetch_slowest_steps(scope_params, since_time) ⇒ Array

Fetch slowest steps using SQL function with fallback

Parameters:

  • scope_params (Hash)

    Scope parameters

  • since_time (Time)

    Analysis start time

Returns:

  • (Array)

    Array of slowest step data



175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'app/services/tasker/analytics_service.rb', line 175

def self.fetch_slowest_steps(scope_params, since_time)
  result = Tasker::Functions::FunctionBasedSlowestSteps.call(
    since_timestamp: since_time,
    limit_count: 10,
    namespace_filter: scope_params[:namespace],
    task_name_filter: scope_params[:name],
    version_filter: scope_params[:version]
  )
  result.map(&:to_h)
rescue StandardError => e
  Rails.logger.warn "SQL function FunctionBasedSlowestSteps failed: #{e.message}, using fallback"
  []
end

.fetch_slowest_tasks(scope_params, since_time) ⇒ Array

Fetch slowest tasks using SQL function with fallback

Parameters:

  • scope_params (Hash)

    Scope parameters

  • since_time (Time)

    Analysis start time

Returns:

  • (Array)

    Array of slowest task data



156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'app/services/tasker/analytics_service.rb', line 156

def self.fetch_slowest_tasks(scope_params, since_time)
  result = Tasker::Functions::FunctionBasedSlowestTasks.call(
    since_timestamp: since_time,
    limit_count: 10,
    namespace_filter: scope_params[:namespace],
    task_name_filter: scope_params[:name],
    version_filter: scope_params[:version]
  )
  result.map(&:to_h)
rescue StandardError => e
  Rails.logger.warn "SQL function FunctionBasedSlowestTasks failed: #{e.message}, using fallback"
  []
end

.find_most_blocked_step_names(task_ids) ⇒ Array

Find most blocked step names using SQL function for step readiness

Parameters:

  • task_ids (Array)

    Array of task IDs to analyze

Returns:

  • (Array)

    Array of step names that are frequently blocked



359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
# File 'app/services/tasker/analytics_service.rb', line 359

def self.find_most_blocked_step_names(task_ids)
  return [] if task_ids.empty?

  # Use existing SQL function for step readiness analysis
  step_statuses = Tasker::Functions::FunctionBasedStepReadinessStatus.for_tasks(task_ids)

  # Find steps that are pending and have unsatisfied dependencies (blocked)
  blocked_steps = step_statuses.select do |step|
    step.current_state == 'pending' && !step.dependencies_satisfied
  end

  # Count by step name and return top 3
  step_name_counts = blocked_steps.group_by(&:name).transform_values(&:count)
  most_blocked = step_name_counts.sort_by { |_name, count| -count }.first(3).map(&:first)

  most_blocked.presence || %w[data_validation external_api_calls]
rescue StandardError => e
  Rails.logger.warn "SQL function FunctionBasedStepReadinessStatus failed: #{e.message}, using fallback"
  %w[data_validation external_api_calls]
end

.generate_recommendations(slowest_tasks, slowest_steps) ⇒ Array

Generate recommendations based on function data

Parameters:

  • slowest_tasks (Array)

    Slowest tasks from SQL function

  • slowest_steps (Array)

    Slowest steps from SQL function

Returns:

  • (Array)

    Array of recommendation strings



294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
# File 'app/services/tasker/analytics_service.rb', line 294

def self.generate_recommendations(slowest_tasks, slowest_steps)
  recommendations = []

  # Analyze slowest tasks
  if slowest_tasks.any? { |task| task[:duration_seconds] > 60 }
    avg_duration = slowest_tasks.sum { |task| task[:duration_seconds] } / slowest_tasks.size
    recommendations << "Consider optimizing long-running tasks (#{avg_duration.round(1)}s average)"
  end

  # Analyze step patterns
  if slowest_steps.any? { |step| step[:attempts] > 1 }
    recommendations << 'High retry patterns detected. Review error handling and timeout configurations.'
  end

  # Default recommendations if none generated
  if recommendations.empty?
    recommendations = [
      'Monitor task performance trends regularly',
      'Review timeout configurations for network operations',
      'Consider implementing caching for repeated operations'
    ]
  end

  recommendations.take(5)
rescue StandardError => e
  Rails.logger.error "Error generating recommendations: #{e.message}"
  ['Monitor system performance regularly']
end