Module: Aidp::WorkstreamState

Defined in:
lib/aidp/workstream_state.rb

Overview

Manages per-workstream state (task, iterations, timestamps, event log) Stored under: .aidp/workstreams/<slug>/state.json and history.jsonl

Defined Under Namespace

Classes: Error

Class Method Summary collapse

Class Method Details

.append_event(slug:, project_dir:, type:, data: {}) ⇒ Object

Append event to history.jsonl



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/aidp/workstream_state.rb', line 97

def append_event(slug:, project_dir:, type:, data: {})
  file = history_file(slug, project_dir)
  FileUtils.mkdir_p(File.dirname(file))
  event = {
    timestamp: Time.now.utc.iso8601,
    type: type,
    data: data
  }
  File.open(file, "a") { |f| f.puts(JSON.generate(event)) }
  # Mirror to worktree if it exists
  wt_file = worktree_history_file(slug, project_dir)
  if wt_file
    FileUtils.mkdir_p(File.dirname(wt_file))
    File.open(wt_file, "a") { |f| f.puts(JSON.generate(event)) }
  end
  event
end

.auto_complete_stalled(slug:, project_dir:, threshold_seconds: 3600) ⇒ Object

Auto-complete stalled workstreams



142
143
144
145
146
# File 'lib/aidp/workstream_state.rb', line 142

def auto_complete_stalled(slug:, project_dir:, threshold_seconds: 3600)
  return unless stalled?(slug: slug, project_dir: project_dir, threshold_seconds: threshold_seconds)
  complete(slug: slug, project_dir: project_dir)
  append_event(slug: slug, project_dir: project_dir, type: "auto_completed", data: {reason: "stalled"})
end

.complete(slug:, project_dir:) ⇒ Object

Mark workstream as completed



183
184
185
186
187
188
189
190
191
192
# File 'lib/aidp/workstream_state.rb', line 183

def complete(slug:, project_dir:)
  state = read(slug: slug, project_dir: project_dir)
  return {error: "Workstream not found"} unless state
  return {error: "Already completed"} if state[:status] == "completed"

  now = Time.now.utc.iso8601
  update(slug: slug, project_dir: project_dir, status: "completed", completed_at: now)
  append_event(slug: slug, project_dir: project_dir, type: "completed", data: {iterations: state[:iterations]})
  {status: "completed"}
end

.elapsed_seconds(slug:, project_dir:) ⇒ Object



127
128
129
130
131
# File 'lib/aidp/workstream_state.rb', line 127

def elapsed_seconds(slug:, project_dir:)
  state = read(slug: slug, project_dir: project_dir)
  return 0 unless state && state[:started_at]
  (Time.now.utc - Time.parse(state[:started_at])).to_i
end

.history_file(slug, project_dir) ⇒ Object



25
26
27
# File 'lib/aidp/workstream_state.rb', line 25

def history_file(slug, project_dir)
  File.join(workstream_dir(slug, project_dir), "history.jsonl")
end

.increment_iteration(slug:, project_dir:) ⇒ Object

Increment iteration counter and record event



83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/aidp/workstream_state.rb', line 83

def increment_iteration(slug:, project_dir:)
  state = read(slug: slug, project_dir: project_dir) || init(slug: slug, project_dir: project_dir)
  state[:iterations] = (state[:iterations] || 0) + 1
  state[:updated_at] = Time.now.utc.iso8601
  # Update status to active if paused (auto-resume on iteration)
  state[:status] = "active" if state[:status] == "paused"
  write_json(state_file(slug, project_dir), state)
  # Mirror to worktree if it exists
  mirror_to_worktree(slug, project_dir, state)
  append_event(slug: slug, project_dir: project_dir, type: "iteration", data: {count: state[:iterations]})
  state
end

.init(slug:, project_dir:, task: nil) ⇒ Object

Initialize state for a new workstream



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/aidp/workstream_state.rb', line 43

def init(slug:, project_dir:, task: nil)
  dir = workstream_dir(slug, project_dir)
  FileUtils.mkdir_p(dir)
  now = Time.now.utc
  state = {
    slug: slug,
    status: "active",
    task: task,
    started_at: now.iso8601,
    updated_at: now.iso8601,
    iterations: 0
  }
  write_json(state_file(slug, project_dir), state)
  # Mirror to worktree if it exists
  mirror_to_worktree(slug, project_dir, state)
  append_event(slug: slug, project_dir: project_dir, type: "created", data: {task: task})
  state
end

.mark_removed(slug:, project_dir:) ⇒ Object



148
149
150
151
152
153
154
155
156
# File 'lib/aidp/workstream_state.rb', line 148

def mark_removed(slug:, project_dir:)
  state = read(slug: slug, project_dir: project_dir)
  # Auto-complete if active when removing
  if state && state[:status] == "active"
    complete(slug: slug, project_dir: project_dir)
  end
  update(slug: slug, project_dir: project_dir, status: "removed")
  append_event(slug: slug, project_dir: project_dir, type: "removed", data: {})
end

.pause(slug:, project_dir:) ⇒ Object

Pause workstream (stop iteration without completion)



159
160
161
162
163
164
165
166
167
168
# File 'lib/aidp/workstream_state.rb', line 159

def pause(slug:, project_dir:)
  state = read(slug: slug, project_dir: project_dir)
  return {error: "Workstream not found"} unless state
  return {error: "Already paused"} if state[:status] == "paused"

  now = Time.now.utc.iso8601
  update(slug: slug, project_dir: project_dir, status: "paused", paused_at: now)
  append_event(slug: slug, project_dir: project_dir, type: "paused", data: {})
  {status: "paused"}
end

.read(slug:, project_dir:) ⇒ Object

Read current state (returns hash or nil)



63
64
65
66
67
68
69
# File 'lib/aidp/workstream_state.rb', line 63

def read(slug:, project_dir:)
  file = state_file(slug, project_dir)
  return nil unless File.exist?(file)
  JSON.parse(File.read(file), symbolize_names: true)
rescue JSON::ParserError
  nil
end

.recent_events(slug:, project_dir:, limit: 5) ⇒ Object

Read recent N events



116
117
118
119
120
121
122
123
124
125
# File 'lib/aidp/workstream_state.rb', line 116

def recent_events(slug:, project_dir:, limit: 5)
  file = history_file(slug, project_dir)
  return [] unless File.exist?(file)
  lines = File.readlines(file, chomp: true)
  lines.last(limit).map do |line|
    JSON.parse(line, symbolize_names: true)
  rescue JSON::ParserError
    nil
  end.compact
end

.resume(slug:, project_dir:) ⇒ Object

Resume workstream (return to active status)



171
172
173
174
175
176
177
178
179
180
# File 'lib/aidp/workstream_state.rb', line 171

def resume(slug:, project_dir:)
  state = read(slug: slug, project_dir: project_dir)
  return {error: "Workstream not found"} unless state
  return {error: "Not paused"} unless state[:status] == "paused"

  now = Time.now.utc.iso8601
  update(slug: slug, project_dir: project_dir, status: "active", resumed_at: now)
  append_event(slug: slug, project_dir: project_dir, type: "resumed", data: {})
  {status: "active"}
end

.root_dir(project_dir) ⇒ Object



13
14
15
# File 'lib/aidp/workstream_state.rb', line 13

def root_dir(project_dir)
  File.join(project_dir, ".aidp", "workstreams")
end

.stalled?(slug:, project_dir:, threshold_seconds: 3600) ⇒ Boolean

Check if workstream appears stalled (no activity for threshold seconds)

Returns:

  • (Boolean)


134
135
136
137
138
139
# File 'lib/aidp/workstream_state.rb', line 134

def stalled?(slug:, project_dir:, threshold_seconds: 3600)
  state = read(slug: slug, project_dir: project_dir)
  return false unless state && state[:updated_at]
  return false if state[:status] != "active" # Only check active workstreams
  (Time.now.utc - Time.parse(state[:updated_at])).to_i > threshold_seconds
end

.state_file(slug, project_dir) ⇒ Object



21
22
23
# File 'lib/aidp/workstream_state.rb', line 21

def state_file(slug, project_dir)
  File.join(workstream_dir(slug, project_dir), "state.json")
end

.update(slug:, project_dir:, **attrs) ⇒ Object

Update selected attributes; updates updated_at automatically



72
73
74
75
76
77
78
79
80
# File 'lib/aidp/workstream_state.rb', line 72

def update(slug:, project_dir:, **attrs)
  state = read(slug: slug, project_dir: project_dir) || init(slug: slug, project_dir: project_dir)
  state.merge!(attrs.transform_keys(&:to_sym))
  state[:updated_at] = Time.now.utc.iso8601
  write_json(state_file(slug, project_dir), state)
  # Mirror to worktree if it exists
  mirror_to_worktree(slug, project_dir, state)
  state
end

.workstream_dir(slug, project_dir) ⇒ Object



17
18
19
# File 'lib/aidp/workstream_state.rb', line 17

def workstream_dir(slug, project_dir)
  File.join(root_dir(project_dir), slug)
end

.worktree_history_file(slug, project_dir) ⇒ Object



36
37
38
39
40
# File 'lib/aidp/workstream_state.rb', line 36

def worktree_history_file(slug, project_dir)
  worktree_path = File.join(project_dir, ".worktrees", slug)
  return nil unless Dir.exist?(worktree_path)
  File.join(worktree_path, ".aidp", "workstreams", slug, "history.jsonl")
end

.worktree_state_file(slug, project_dir) ⇒ Object

Per-worktree state files (mirrored for local inspection)



30
31
32
33
34
# File 'lib/aidp/workstream_state.rb', line 30

def worktree_state_file(slug, project_dir)
  worktree_path = File.join(project_dir, ".worktrees", slug)
  return nil unless Dir.exist?(worktree_path)
  File.join(worktree_path, ".aidp", "workstreams", slug, "state.json")
end