Module: Aidp::WorkstreamState
- Defined in:
- lib/aidp/workstream_state.rb
Overview
Manages per-workstream state (task, iterations, timestamps, event log) Stored under: .aidp/workstreams/<slug>/state.json and history.jsonl
Defined Under Namespace
Classes: Error
Class Method Summary collapse
-
.append_event(slug:, project_dir:, type:, data: {}) ⇒ Object
Append event to history.jsonl.
-
.auto_complete_stalled(slug:, project_dir:, threshold_seconds: 3600) ⇒ Object
Auto-complete stalled workstreams.
-
.complete(slug:, project_dir:) ⇒ Object
Mark workstream as completed.
- .elapsed_seconds(slug:, project_dir:) ⇒ Object
- .history_file(slug, project_dir) ⇒ Object
-
.increment_iteration(slug:, project_dir:) ⇒ Object
Increment iteration counter and record event.
-
.init(slug:, project_dir:, task: nil) ⇒ Object
Initialize state for a new workstream.
- .mark_removed(slug:, project_dir:) ⇒ Object
-
.pause(slug:, project_dir:) ⇒ Object
Pause workstream (stop iteration without completion).
-
.read(slug:, project_dir:) ⇒ Object
Read current state (returns hash or nil).
-
.recent_events(slug:, project_dir:, limit: 5) ⇒ Object
Read recent N events.
-
.resume(slug:, project_dir:) ⇒ Object
Resume workstream (return to active status).
- .root_dir(project_dir) ⇒ Object
-
.stalled?(slug:, project_dir:, threshold_seconds: 3600) ⇒ Boolean
Check if workstream appears stalled (no activity for threshold seconds).
- .state_file(slug, project_dir) ⇒ Object
-
.update(slug:, project_dir:, **attrs) ⇒ Object
Update selected attributes; updates updated_at automatically.
- .workstream_dir(slug, project_dir) ⇒ Object
- .worktree_history_file(slug, project_dir) ⇒ Object
-
.worktree_state_file(slug, project_dir) ⇒ Object
Per-worktree state files (mirrored for local inspection).
Class Method Details
.append_event(slug:, project_dir:, type:, data: {}) ⇒ Object
Append event to history.jsonl
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/aidp/workstream_state.rb', line 97 def append_event(slug:, project_dir:, type:, data: {}) file = history_file(slug, project_dir) FileUtils.mkdir_p(File.dirname(file)) event = { timestamp: Time.now.utc.iso8601, type: type, data: data } File.open(file, "a") { |f| f.puts(JSON.generate(event)) } # Mirror to worktree if it exists wt_file = worktree_history_file(slug, project_dir) if wt_file FileUtils.mkdir_p(File.dirname(wt_file)) File.open(wt_file, "a") { |f| f.puts(JSON.generate(event)) } end event end |
.auto_complete_stalled(slug:, project_dir:, threshold_seconds: 3600) ⇒ Object
Auto-complete stalled workstreams
142 143 144 145 146 |
# File 'lib/aidp/workstream_state.rb', line 142 def auto_complete_stalled(slug:, project_dir:, threshold_seconds: 3600) return unless stalled?(slug: slug, project_dir: project_dir, threshold_seconds: threshold_seconds) complete(slug: slug, project_dir: project_dir) append_event(slug: slug, project_dir: project_dir, type: "auto_completed", data: {reason: "stalled"}) end |
.complete(slug:, project_dir:) ⇒ Object
Mark workstream as completed
183 184 185 186 187 188 189 190 191 192 |
# File 'lib/aidp/workstream_state.rb', line 183 def complete(slug:, project_dir:) state = read(slug: slug, project_dir: project_dir) return {error: "Workstream not found"} unless state return {error: "Already completed"} if state[:status] == "completed" now = Time.now.utc.iso8601 update(slug: slug, project_dir: project_dir, status: "completed", completed_at: now) append_event(slug: slug, project_dir: project_dir, type: "completed", data: {iterations: state[:iterations]}) {status: "completed"} end |
.elapsed_seconds(slug:, project_dir:) ⇒ Object
127 128 129 130 131 |
# File 'lib/aidp/workstream_state.rb', line 127 def elapsed_seconds(slug:, project_dir:) state = read(slug: slug, project_dir: project_dir) return 0 unless state && state[:started_at] (Time.now.utc - Time.parse(state[:started_at])).to_i end |
.history_file(slug, project_dir) ⇒ Object
25 26 27 |
# File 'lib/aidp/workstream_state.rb', line 25 def history_file(slug, project_dir) File.join(workstream_dir(slug, project_dir), "history.jsonl") end |
.increment_iteration(slug:, project_dir:) ⇒ Object
Increment iteration counter and record event
83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/aidp/workstream_state.rb', line 83 def increment_iteration(slug:, project_dir:) state = read(slug: slug, project_dir: project_dir) || init(slug: slug, project_dir: project_dir) state[:iterations] = (state[:iterations] || 0) + 1 state[:updated_at] = Time.now.utc.iso8601 # Update status to active if paused (auto-resume on iteration) state[:status] = "active" if state[:status] == "paused" write_json(state_file(slug, project_dir), state) # Mirror to worktree if it exists mirror_to_worktree(slug, project_dir, state) append_event(slug: slug, project_dir: project_dir, type: "iteration", data: {count: state[:iterations]}) state end |
.init(slug:, project_dir:, task: nil) ⇒ Object
Initialize state for a new workstream
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/aidp/workstream_state.rb', line 43 def init(slug:, project_dir:, task: nil) dir = workstream_dir(slug, project_dir) FileUtils.mkdir_p(dir) now = Time.now.utc state = { slug: slug, status: "active", task: task, started_at: now.iso8601, updated_at: now.iso8601, iterations: 0 } write_json(state_file(slug, project_dir), state) # Mirror to worktree if it exists mirror_to_worktree(slug, project_dir, state) append_event(slug: slug, project_dir: project_dir, type: "created", data: {task: task}) state end |
.mark_removed(slug:, project_dir:) ⇒ Object
148 149 150 151 152 153 154 155 156 |
# File 'lib/aidp/workstream_state.rb', line 148 def mark_removed(slug:, project_dir:) state = read(slug: slug, project_dir: project_dir) # Auto-complete if active when removing if state && state[:status] == "active" complete(slug: slug, project_dir: project_dir) end update(slug: slug, project_dir: project_dir, status: "removed") append_event(slug: slug, project_dir: project_dir, type: "removed", data: {}) end |
.pause(slug:, project_dir:) ⇒ Object
Pause workstream (stop iteration without completion)
159 160 161 162 163 164 165 166 167 168 |
# File 'lib/aidp/workstream_state.rb', line 159 def pause(slug:, project_dir:) state = read(slug: slug, project_dir: project_dir) return {error: "Workstream not found"} unless state return {error: "Already paused"} if state[:status] == "paused" now = Time.now.utc.iso8601 update(slug: slug, project_dir: project_dir, status: "paused", paused_at: now) append_event(slug: slug, project_dir: project_dir, type: "paused", data: {}) {status: "paused"} end |
.read(slug:, project_dir:) ⇒ Object
Read current state (returns hash or nil)
63 64 65 66 67 68 69 |
# File 'lib/aidp/workstream_state.rb', line 63 def read(slug:, project_dir:) file = state_file(slug, project_dir) return nil unless File.exist?(file) JSON.parse(File.read(file), symbolize_names: true) rescue JSON::ParserError nil end |
.recent_events(slug:, project_dir:, limit: 5) ⇒ Object
Read recent N events
116 117 118 119 120 121 122 123 124 125 |
# File 'lib/aidp/workstream_state.rb', line 116 def recent_events(slug:, project_dir:, limit: 5) file = history_file(slug, project_dir) return [] unless File.exist?(file) lines = File.readlines(file, chomp: true) lines.last(limit).map do |line| JSON.parse(line, symbolize_names: true) rescue JSON::ParserError nil end.compact end |
.resume(slug:, project_dir:) ⇒ Object
Resume workstream (return to active status)
171 172 173 174 175 176 177 178 179 180 |
# File 'lib/aidp/workstream_state.rb', line 171 def resume(slug:, project_dir:) state = read(slug: slug, project_dir: project_dir) return {error: "Workstream not found"} unless state return {error: "Not paused"} unless state[:status] == "paused" now = Time.now.utc.iso8601 update(slug: slug, project_dir: project_dir, status: "active", resumed_at: now) append_event(slug: slug, project_dir: project_dir, type: "resumed", data: {}) {status: "active"} end |
.root_dir(project_dir) ⇒ Object
13 14 15 |
# File 'lib/aidp/workstream_state.rb', line 13 def root_dir(project_dir) File.join(project_dir, ".aidp", "workstreams") end |
.stalled?(slug:, project_dir:, threshold_seconds: 3600) ⇒ Boolean
Check if workstream appears stalled (no activity for threshold seconds)
134 135 136 137 138 139 |
# File 'lib/aidp/workstream_state.rb', line 134 def stalled?(slug:, project_dir:, threshold_seconds: 3600) state = read(slug: slug, project_dir: project_dir) return false unless state && state[:updated_at] return false if state[:status] != "active" # Only check active workstreams (Time.now.utc - Time.parse(state[:updated_at])).to_i > threshold_seconds end |
.state_file(slug, project_dir) ⇒ Object
21 22 23 |
# File 'lib/aidp/workstream_state.rb', line 21 def state_file(slug, project_dir) File.join(workstream_dir(slug, project_dir), "state.json") end |
.update(slug:, project_dir:, **attrs) ⇒ Object
Update selected attributes; updates updated_at automatically
72 73 74 75 76 77 78 79 80 |
# File 'lib/aidp/workstream_state.rb', line 72 def update(slug:, project_dir:, **attrs) state = read(slug: slug, project_dir: project_dir) || init(slug: slug, project_dir: project_dir) state.merge!(attrs.transform_keys(&:to_sym)) state[:updated_at] = Time.now.utc.iso8601 write_json(state_file(slug, project_dir), state) # Mirror to worktree if it exists mirror_to_worktree(slug, project_dir, state) state end |
.workstream_dir(slug, project_dir) ⇒ Object
17 18 19 |
# File 'lib/aidp/workstream_state.rb', line 17 def workstream_dir(slug, project_dir) File.join(root_dir(project_dir), slug) end |
.worktree_history_file(slug, project_dir) ⇒ Object
36 37 38 39 40 |
# File 'lib/aidp/workstream_state.rb', line 36 def worktree_history_file(slug, project_dir) worktree_path = File.join(project_dir, ".worktrees", slug) return nil unless Dir.exist?(worktree_path) File.join(worktree_path, ".aidp", "workstreams", slug, "history.jsonl") end |
.worktree_state_file(slug, project_dir) ⇒ Object
Per-worktree state files (mirrored for local inspection)
30 31 32 33 34 |
# File 'lib/aidp/workstream_state.rb', line 30 def worktree_state_file(slug, project_dir) worktree_path = File.join(project_dir, ".worktrees", slug) return nil unless Dir.exist?(worktree_path) File.join(worktree_path, ".aidp", "workstreams", slug, "state.json") end |