Class: Roast::Workflow::SqliteStateRepository
- Inherits:
-
StateRepository
- Object
- StateRepository
- Roast::Workflow::SqliteStateRepository
- Defined in:
- lib/roast/workflow/sqlite_state_repository.rb
Overview
SQLite-based implementation of StateRepository Provides structured, queryable session storage with better performance
Constant Summary collapse
- DEFAULT_DB_PATH =
File.("~/.roast/sessions.db")
Instance Method Summary collapse
- #add_event(workflow_path, session_id, event_name, event_data = nil) ⇒ Object
- #cleanup_old_sessions(older_than) ⇒ Object
- #get_session_details(session_id) ⇒ Object
-
#initialize(db_path: nil, session_manager: SessionManager.new) ⇒ SqliteStateRepository
constructor
A new instance of SqliteStateRepository.
-
#list_sessions(status: nil, workflow_name: nil, older_than: nil, limit: 100) ⇒ Object
Additional query methods for the new capabilities.
- #load_state_before_step(workflow, step_name, timestamp: nil) ⇒ Object
- #save_final_output(workflow, output_content) ⇒ Object
- #save_state(workflow, step_name, state_data) ⇒ Object
Constructor Details
#initialize(db_path: nil, session_manager: SessionManager.new) ⇒ SqliteStateRepository
Returns a new instance of SqliteStateRepository.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/roast/workflow/sqlite_state_repository.rb', line 12 def initialize(db_path: nil, session_manager: SessionManager.new) super() # Lazy load sqlite3 only when actually using SQLite storage begin require "sqlite3" rescue LoadError raise LoadError, "SQLite storage requires the 'sqlite3' gem. Please add it to your Gemfile or install it: gem install sqlite3" end @db_path = db_path || ENV["ROAST_SESSIONS_DB"] || DEFAULT_DB_PATH @session_manager = session_manager ensure_database end |
Instance Method Details
#add_event(workflow_path, session_id, event_name, event_data = nil) ⇒ Object
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/roast/workflow/sqlite_state_repository.rb', line 180 def add_event(workflow_path, session_id, event_name, event_data = nil) # Find the session if session_id not provided unless session_id workflow_name = File.basename(File.dirname(workflow_path)) result = @db.execute(" SELECT id FROM sessions\n WHERE workflow_name = ? AND status = ?\n ORDER BY created_at DESC\n LIMIT 1\n SQL\n\n raise \"No waiting session found for workflow: \#{workflow_name}\" if result.empty?\n\n session_id = result[0][0]\n end\n\n # Add the event\n @db.execute(<<~SQL, [session_id, event_name, event_data&.to_json])\n INSERT INTO session_events (session_id, event_name, event_data)\n VALUES (?, ?, ?)\n SQL\n\n # Update session status\n @db.execute(<<~SQL, [session_id])\n UPDATE sessions\#{\" \"}\n SET status = 'running', updated_at = CURRENT_TIMESTAMP\n WHERE id = ?\n SQL\n\n session_id\nend\n", [workflow_name, "waiting"]) |
#cleanup_old_sessions(older_than) ⇒ Object
171 172 173 174 175 176 177 178 |
# File 'lib/roast/workflow/sqlite_state_repository.rb', line 171 def cleanup_old_sessions(older_than) count = @db.changes @db.execute(" DELETE FROM sessions\n WHERE created_at < datetime('now', ?)\n SQL\n @db.changes - count\nend\n", ["-#{older_than}"]) |
#get_session_details(session_id) ⇒ Object
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/roast/workflow/sqlite_state_repository.rb', line 143 def get_session_details(session_id) session = @db.execute(" SELECT * FROM sessions WHERE id = ?\n SQL\n\n return unless session\n\n states = @db.execute(<<~SQL, [session_id])\n SELECT step_index, step_name, created_at\n FROM session_states\n WHERE session_id = ?\n ORDER BY step_index\n SQL\n\n events = @db.execute(<<~SQL, [session_id])\n SELECT event_name, event_data, received_at\n FROM session_events\n WHERE session_id = ?\n ORDER BY received_at\n SQL\n\n {\n session: session,\n states: states,\n events: events,\n }\nend\n", [session_id]).first |
#list_sessions(status: nil, workflow_name: nil, older_than: nil, limit: 100) ⇒ Object
Additional query methods for the new capabilities
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/roast/workflow/sqlite_state_repository.rb', line 112 def list_sessions(status: nil, workflow_name: nil, older_than: nil, limit: 100) conditions = [] params = [] if status conditions << "status = ?" params << status end if workflow_name conditions << "workflow_name = ?" params << workflow_name end if older_than conditions << "created_at < datetime('now', ?)" params << "-#{older_than}" end where_clause = conditions.empty? ? "" : "WHERE #{conditions.join(" AND ")}" @db.execute(" SELECT id, workflow_name, workflow_path, status, current_step_index,\#{\" \"}\n created_at, updated_at\n FROM sessions\n \#{where_clause}\n ORDER BY created_at DESC\n LIMIT \#{limit}\n SQL\nend\n", params) |
#load_state_before_step(workflow, step_name, timestamp: nil) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/roast/workflow/sqlite_state_repository.rb', line 47 def load_state_before_step(workflow, step_name, timestamp: nil) session_id = find_session_id(workflow, ) return false unless session_id # Find the state before the target step result = @db.execute(" SELECT state_data, step_name\n FROM session_states\n WHERE session_id = ?\n AND step_index < (\n SELECT MIN(step_index)\#{\" \"}\n FROM session_states\#{\" \"}\n WHERE session_id = ? AND step_name = ?\n )\n ORDER BY step_index DESC\n LIMIT 1\n SQL\n\n if result.empty?\n # Try to find the latest state if target step doesn't exist\n result = @db.execute(<<~SQL, [session_id])\n SELECT state_data, step_name\n FROM session_states\n WHERE session_id = ?\n ORDER BY step_index DESC\n LIMIT 1\n SQL\n\n if result.empty?\n $stderr.puts \"No state found for session\"\n return false\n end\n end\n\n state_data = JSON.parse(result[0][0], symbolize_names: true)\n loaded_step = result[0][1]\n $stderr.puts \"Found state from step: \#{loaded_step} (will replay from here to \#{step_name})\"\n\n # If no timestamp provided and workflow has no session, create new session and copy states\n if !timestamp && workflow.session_timestamp.nil?\n copy_states_to_new_session(workflow, session_id, step_name)\n end\n\n state_data\nend\n", [session_id, step_name]) |
#save_final_output(workflow, output_content) ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/roast/workflow/sqlite_state_repository.rb', line 93 def save_final_output(workflow, output_content) return if output_content.empty? session_id = ensure_session(workflow) @db.execute(" UPDATE sessions\#{\" \"}\n SET final_output = ?, status = 'completed', updated_at = CURRENT_TIMESTAMP\n WHERE id = ?\n SQL\n\n session_id\nrescue => e\n $stderr.puts \"Failed to save final output: \#{e.message}\"\n nil\nend\n", [output_content, session_id]) |
#save_state(workflow, step_name, state_data) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/roast/workflow/sqlite_state_repository.rb', line 27 def save_state(workflow, step_name, state_data) workflow. ||= @session_manager.create_new_session(workflow.object_id) session_id = ensure_session(workflow) @db.execute(" INSERT INTO session_states (session_id, step_index, step_name, state_data)\n VALUES (?, ?, ?, ?)\n SQL\n\n # Update session's current step\n @db.execute(<<~SQL, [state_data[:order], session_id])\n UPDATE sessions\#{\" \"}\n SET current_step_index = ?, updated_at = CURRENT_TIMESTAMP\n WHERE id = ?\n SQL\nrescue => e\n $stderr.puts \"Failed to save state for step \#{step_name}: \#{e.message}\"\nend\n", [session_id, state_data[:order], step_name, state_data.to_json]) |