Class: Pindo::TaskSystem::PindoTask

Inherits:
Object
  • Object
show all
Defined in:
lib/pindo/module/task/pindo_task.rb

Overview

PindoTask 基类(简化版,所有任务在主线程中执行)

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, options = {}) ⇒ PindoTask

Returns a new instance of PindoTask.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/pindo/module/task/pindo_task.rb', line 43

def initialize(name, options = {})
  @id = SecureRandom.uuid
  @name = name
  @type = self.class.task_type
  @task_key = self.class.task_key
  @priority = options[:priority] || TaskPriority::HIGH
  @status = TaskStatus::PENDING
  @dependencies = options[:dependencies] || []
  @data_dependencies = options[:data_dependencies] || []
  @context = options[:context] || {}
   = options[:metadata] || {}
  @error = nil
  @result = nil
  @created_at = Time.now
  @started_at = nil
  @finished_at = nil
  @callbacks = {
    before: [],
    after: [],
    on_success: [],
    on_failure: []
  }
  @callbacks_setup = false

  # 重试配置
  @retry_mode = options[:retry_mode] || self.class.default_retry_mode
  @retry_count = options[:retry_count] || self.class.default_retry_count
  @max_retry_count = @retry_count  # 保存初始最大重试次数
  @retry_delay = options[:retry_delay] || self.class.default_retry_delay
end

Instance Attribute Details

#callbacks_setupObject

标记回调是否已经设置



40
41
42
# File 'lib/pindo/module/task/pindo_task.rb', line 40

def callbacks_setup
  @callbacks_setup
end

#contextObject

Returns the value of attribute context.



36
37
38
# File 'lib/pindo/module/task/pindo_task.rb', line 36

def context
  @context
end

#created_atObject (readonly)

Returns the value of attribute created_at.



37
38
39
# File 'lib/pindo/module/task/pindo_task.rb', line 37

def created_at
  @created_at
end

#data_dependenciesObject (readonly)

Returns the value of attribute data_dependencies.



35
36
37
# File 'lib/pindo/module/task/pindo_task.rb', line 35

def data_dependencies
  @data_dependencies
end

#dependenciesObject (readonly)

Returns the value of attribute dependencies.



35
36
37
# File 'lib/pindo/module/task/pindo_task.rb', line 35

def dependencies
  @dependencies
end

#errorObject

Returns the value of attribute error.



34
35
36
# File 'lib/pindo/module/task/pindo_task.rb', line 34

def error
  @error
end

#finished_atObject (readonly)

Returns the value of attribute finished_at.



37
38
39
# File 'lib/pindo/module/task/pindo_task.rb', line 37

def finished_at
  @finished_at
end

#idObject

Returns the value of attribute id.



34
35
36
# File 'lib/pindo/module/task/pindo_task.rb', line 34

def id
  @id
end

#max_retry_countObject (readonly)

max_retry_count: 初始最大重试次数



39
40
41
# File 'lib/pindo/module/task/pindo_task.rb', line 39

def max_retry_count
  @max_retry_count
end

#metadataObject

Returns the value of attribute metadata.



36
37
38
# File 'lib/pindo/module/task/pindo_task.rb', line 36

def 
  
end

#nameObject

Returns the value of attribute name.



34
35
36
# File 'lib/pindo/module/task/pindo_task.rb', line 34

def name
  @name
end

#priorityObject (readonly)

Returns the value of attribute priority.



35
36
37
# File 'lib/pindo/module/task/pindo_task.rb', line 35

def priority
  @priority
end

#resultObject

Returns the value of attribute result.



34
35
36
# File 'lib/pindo/module/task/pindo_task.rb', line 34

def result
  @result
end

#retry_countObject

剩余重试次数



38
39
40
# File 'lib/pindo/module/task/pindo_task.rb', line 38

def retry_count
  @retry_count
end

#retry_delayObject (readonly)

max_retry_count: 初始最大重试次数



39
40
41
# File 'lib/pindo/module/task/pindo_task.rb', line 39

def retry_delay
  @retry_delay
end

#retry_modeObject (readonly)

max_retry_count: 初始最大重试次数



39
40
41
# File 'lib/pindo/module/task/pindo_task.rb', line 39

def retry_mode
  @retry_mode
end

#started_atObject (readonly)

Returns the value of attribute started_at.



37
38
39
# File 'lib/pindo/module/task/pindo_task.rb', line 37

def started_at
  @started_at
end

#statusObject

Returns the value of attribute status.



34
35
36
# File 'lib/pindo/module/task/pindo_task.rb', line 34

def status
  @status
end

#task_keyObject (readonly)

Returns the value of attribute task_key.



35
36
37
# File 'lib/pindo/module/task/pindo_task.rb', line 35

def task_key
  @task_key
end

#task_managerObject

TaskManager 实例(依赖注入)



41
42
43
# File 'lib/pindo/module/task/pindo_task.rb', line 41

def task_manager
  @task_manager
end

#typeObject (readonly)

Returns the value of attribute type.



35
36
37
# File 'lib/pindo/module/task/pindo_task.rb', line 35

def type
  @type
end

Class Method Details

.default_retry_countObject



88
89
90
# File 'lib/pindo/module/task/pindo_task.rb', line 88

def self.default_retry_count
  0  # 默认不重试
end

.default_retry_delayObject



92
93
94
# File 'lib/pindo/module/task/pindo_task.rb', line 92

def self.default_retry_delay
  10  # 默认延迟 10 秒
end

.default_retry_modeObject

默认重试配置



84
85
86
# File 'lib/pindo/module/task/pindo_task.rb', line 84

def self.default_retry_mode
  RetryMode::IMMEDIATE
end

.task_keyObject

Raises:

  • (NotImplementedError)


79
80
81
# File 'lib/pindo/module/task/pindo_task.rb', line 79

def self.task_key
  raise NotImplementedError, "Subclass must define task_key"
end

.task_typeObject

子类必须实现的方法

Raises:

  • (NotImplementedError)


75
76
77
# File 'lib/pindo/module/task/pindo_task.rb', line 75

def self.task_type
  raise NotImplementedError, "Subclass must define task_type"
end

Instance Method Details

#before_retryObject



246
247
248
# File 'lib/pindo/module/task/pindo_task.rb', line 246

def before_retry
  # 子类可重写,进行重试前的清理工作
end

#cancelObject

取消任务



117
118
119
120
121
122
123
124
# File 'lib/pindo/module/task/pindo_task.rb', line 117

def cancel
  if @status == TaskStatus::PENDING
    @status = TaskStatus::CANCELLED
    true
  else
    false
  end
end

#cancelled?Boolean

检查是否已取消(不抛异常)

Returns:

  • (Boolean)


141
142
143
# File 'lib/pindo/module/task/pindo_task.rb', line 141

def cancelled?
  @status == TaskStatus::CANCELLED
end

#check_cancelled!Object

检查是否已取消,如果已取消则抛出异常



134
135
136
137
138
# File 'lib/pindo/module/task/pindo_task.rb', line 134

def check_cancelled!
  if @status == TaskStatus::CANCELLED
    raise TaskCancelledException.new("任务已被取消: #{@name}")
  end
end

#data_paramHash

获取任务的数据参数(用于传递给其他任务)

Returns:

  • (Hash)

    包含任务标识和参数的哈希



179
180
181
182
183
184
185
186
187
# File 'lib/pindo/module/task/pindo_task.rb', line 179

def data_param
  {
    task_id: @id,
    task_type: @type,
    task_key: @task_key,
    task_name: @name,
    task_param: build_task_param
  }
end

#do_taskObject

执行任务(在主线程中同步执行)



97
98
99
# File 'lib/pindo/module/task/pindo_task.rb', line 97

def do_task
  execute_internal
end

#execution_timeObject

执行时间



127
128
129
130
131
# File 'lib/pindo/module/task/pindo_task.rb', line 127

def execution_time
  return nil unless @started_at
  end_time = @finished_at || Time.now
  end_time - @started_at
end

#finished?Boolean

检查是否完成

Returns:

  • (Boolean)


102
103
104
# File 'lib/pindo/module/task/pindo_task.rb', line 102

def finished?
  [TaskStatus::SUCCESS, TaskStatus::FAILED, TaskStatus::CANCELLED].include?(@status)
end

#get_all_data_paramsArray<Hash>

获取所有数据依赖任务的数据参数

Returns:

  • (Array<Hash>)

    数据参数数组



201
202
203
# File 'lib/pindo/module/task/pindo_task.rb', line 201

def get_all_data_params
  @data_dependencies.map { |task_id| get_data_param(task_id) }.compact
end

#get_all_data_params_by_key(task_key) ⇒ Array<Hash>

获取所有指定 task_key 的数据参数

Parameters:

  • task_key (Symbol)

    任务键

Returns:

  • (Array<Hash>)

    匹配的任务数据参数数组



219
220
221
# File 'lib/pindo/module/task/pindo_task.rb', line 219

def get_all_data_params_by_key(task_key)
  get_all_data_params.select { |param| param[:task_key] == task_key }
end

#get_all_dependencies_resultsHash

获取所有依赖任务的结果(按依赖顺序)

Returns:

  • (Hash)

    key 为任务 ID,value 为任务结果



167
168
169
170
171
172
173
# File 'lib/pindo/module/task/pindo_task.rb', line 167

def get_all_dependencies_results
  results = {}
  @dependencies.each do |dep_id|
    results[dep_id] = get_dependency_result(dep_id)
  end
  results
end

#get_data_param(task_id) ⇒ Hash?

获取指定数据依赖任务的数据参数

Parameters:

  • task_id (String)

    任务 ID

Returns:

  • (Hash, nil)

    任务的数据参数,如果任务不存在或未完成返回 nil



192
193
194
195
196
197
# File 'lib/pindo/module/task/pindo_task.rb', line 192

def get_data_param(task_id)
  dep_task = get_dependency_task(task_id)
  return nil unless dep_task
  return nil unless dep_task.finished? && dep_task.status == TaskStatus::SUCCESS
  dep_task.data_param
end

#get_data_param_by_key(task_key) ⇒ Hash?

根据 task_key 获取数据依赖任务的数据参数

Parameters:

  • task_key (Symbol)

    任务键

Returns:

  • (Hash, nil)

    第一个匹配的任务数据参数



208
209
210
211
212
213
214
# File 'lib/pindo/module/task/pindo_task.rb', line 208

def get_data_param_by_key(task_key)
  @data_dependencies.each do |task_id|
    param = get_data_param(task_id)
    return param if param && param[:task_key] == task_key
  end
  nil
end

#get_dependency_result(dep_task_id) ⇒ Hash?

获取指定依赖任务的结果

Parameters:

  • dep_task_id (String)

    依赖任务的 ID

Returns:

  • (Hash, nil)

    依赖任务的 result,如果任务不存在或未完成返回 nil



158
159
160
161
162
163
# File 'lib/pindo/module/task/pindo_task.rb', line 158

def get_dependency_result(dep_task_id)
  dep_task = get_dependency_task(dep_task_id)
  return nil unless dep_task
  return nil unless dep_task.finished?
  dep_task.result
end

#get_dependency_task(dep_task_id) ⇒ PindoTask?

获取指定依赖任务

Parameters:

  • dep_task_id (String)

    依赖任务的 ID

Returns:

  • (PindoTask, nil)

    依赖任务对象,如果不存在返回 nil



150
151
152
153
# File 'lib/pindo/module/task/pindo_task.rb', line 150

def get_dependency_task(dep_task_id)
  return nil unless @task_manager
  @task_manager.find_task(dep_task_id)
end

#on(event, &block) ⇒ Object

添加回调



233
234
235
# File 'lib/pindo/module/task/pindo_task.rb', line 233

def on(event, &block)
  @callbacks[event] << block if @callbacks[event]
end

#primary_data_paramHash?

获取主数据参数(第一个数据依赖任务的参数)

Returns:

  • (Hash, nil)

    主数据参数



225
226
227
228
# File 'lib/pindo/module/task/pindo_task.rb', line 225

def primary_data_param
  return nil if @data_dependencies.empty?
  get_data_param(@data_dependencies.first)
end

#reset_for_retryObject



250
251
252
253
254
255
# File 'lib/pindo/module/task/pindo_task.rb', line 250

def reset_for_retry
  @status = TaskStatus::PENDING
  @result = nil
  @started_at = nil
  @finished_at = nil
end

#retryable?Boolean

重试相关方法

Returns:

  • (Boolean)


238
239
240
# File 'lib/pindo/module/task/pindo_task.rb', line 238

def retryable?
  @retry_count > 0
end

#running?Boolean

是否正在运行

Returns:

  • (Boolean)


107
108
109
# File 'lib/pindo/module/task/pindo_task.rb', line 107

def running?
  @status == TaskStatus::RUNNING
end

#should_retry?(error) ⇒ Boolean

Returns:

  • (Boolean)


242
243
244
# File 'lib/pindo/module/task/pindo_task.rb', line 242

def should_retry?(error)
  true  # 默认所有错误都重试,子类可重写
end

#validateObject

验证任务是否可以执行



112
113
114
# File 'lib/pindo/module/task/pindo_task.rb', line 112

def validate
  true
end