Class: Aidp::Security::RuleOfTwoEnforcer
- Inherits:
-
Object
- Object
- Aidp::Security::RuleOfTwoEnforcer
- Defined in:
- lib/aidp/security/rule_of_two_enforcer.rb
Overview
Main enforcement engine for the Rule of Two security policy Tracks trifecta state per work unit and denies operations that would create the lethal trifecta (untrusted_input + private_data + egress)
Usage:
enforcer = RuleOfTwoEnforcer.new
state = enforcer.begin_work_unit(work_unit_id: "unit_123")
state.enable(:untrusted_input, source: "github_issue")
state.enable(:egress, source: "git_push")
# This would raise PolicyViolation:
# state.enable(:private_data, source: "env_var_access")
enforcer.end_work_unit("unit_123")
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
Instance Method Summary collapse
-
#active?(work_unit_id) ⇒ Boolean
Check if a work unit is currently active.
-
#active_count ⇒ Object
Get count of active work units.
-
#audit_log ⇒ Object
Audit log of all completed work units with their final states.
-
#begin_work_unit(work_unit_id:) ⇒ TrifectaState
Begin tracking a new work unit.
-
#enabled? ⇒ Boolean
Check if enforcement is currently enabled.
-
#end_work_unit(work_unit_id) ⇒ Hash
End tracking for a work unit.
-
#enforce!(work_unit_id:, flag:, source:) ⇒ Object
Enforce a flag on a work unit - raises PolicyViolation on failure.
-
#initialize(config: {}) ⇒ RuleOfTwoEnforcer
constructor
A new instance of RuleOfTwoEnforcer.
-
#reset! ⇒ Object
Reset enforcer state (primarily for testing).
-
#state_for(work_unit_id) ⇒ TrifectaState?
Get the state for an active work unit.
-
#status_summary ⇒ Object
Get summary of current enforcement status.
-
#with_work_unit(work_unit_id:) {|TrifectaState| ... } ⇒ Object
Create a scoped execution context that automatically manages work unit lifecycle.
-
#would_allow?(work_unit_id, flag) ⇒ Hash
Check if an operation would be allowed for a work unit.
-
#wrap_agent_operation(work_unit_id:, untrusted_input_source: nil, private_data_source: nil, egress_source: nil) { ... } ⇒ Object
Convenience method to wrap an agent operation with security enforcement.
Constructor Details
#initialize(config: {}) ⇒ RuleOfTwoEnforcer
Returns a new instance of RuleOfTwoEnforcer.
20 21 22 23 24 25 |
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 20 def initialize(config: {}) @config = config @active_states = {} @completed_states = [] @mutex = Mutex.new end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
18 19 20 |
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 18 def config @config end |
Instance Method Details
#active?(work_unit_id) ⇒ Boolean
Check if a work unit is currently active
85 86 87 |
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 85 def active?(work_unit_id) @mutex.synchronize { @active_states.key?(work_unit_id) } end |
#active_count ⇒ Object
Get count of active work units
90 91 92 |
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 90 def active_count @mutex.synchronize { @active_states.size } end |
#audit_log ⇒ Object
Audit log of all completed work units with their final states
164 165 166 |
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 164 def audit_log @mutex.synchronize { @completed_states.dup } end |
#begin_work_unit(work_unit_id:) ⇒ TrifectaState
Begin tracking a new work unit
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 30 def begin_work_unit(work_unit_id:) @mutex.synchronize do if @active_states.key?(work_unit_id) Aidp.log_warn("security.enforcer", "work_unit_already_active", work_unit_id: work_unit_id) return @active_states[work_unit_id] end state = TrifectaState.new(work_unit_id: work_unit_id) @active_states[work_unit_id] = state Aidp.log_debug("security.enforcer", "work_unit_started", work_unit_id: work_unit_id, active_count: @active_states.size) state end end |
#enabled? ⇒ Boolean
Check if enforcement is currently enabled
145 146 147 148 |
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 145 def enabled? # Default to enabled unless explicitly disabled @config.fetch(:enabled, true) end |
#end_work_unit(work_unit_id) ⇒ Hash
End tracking for a work unit
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 52 def end_work_unit(work_unit_id) @mutex.synchronize do state = @active_states.delete(work_unit_id) unless state Aidp.log_warn("security.enforcer", "work_unit_not_found", work_unit_id: work_unit_id) return nil end summary = state.to_h @completed_states << summary # Keep only last 100 completed states @completed_states.shift if @completed_states.size > 100 Aidp.log_debug("security.enforcer", "work_unit_ended", work_unit_id: work_unit_id, final_state: summary, active_count: @active_states.size) summary end end |
#enforce!(work_unit_id:, flag:, source:) ⇒ Object
Enforce a flag on a work unit - raises PolicyViolation on failure
130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 130 def enforce!(work_unit_id:, flag:, source:) state = state_for(work_unit_id) unless state Aidp.log_warn("security.enforcer", "enforce_on_inactive_unit", work_unit_id: work_unit_id, flag: flag, source: source) return nil end state.enable(flag, source: source) end |
#reset! ⇒ Object
Reset enforcer state (primarily for testing)
169 170 171 172 173 174 |
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 169 def reset! @mutex.synchronize do @active_states.clear @completed_states.clear end end |
#state_for(work_unit_id) ⇒ TrifectaState?
Get the state for an active work unit
80 81 82 |
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 80 def state_for(work_unit_id) @mutex.synchronize { @active_states[work_unit_id] } end |
#status_summary ⇒ Object
Get summary of current enforcement status
151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 151 def status_summary @mutex.synchronize do { enabled: enabled?, active_work_units: @active_states.size, completed_work_units: @completed_states.size, active_states: @active_states.transform_values(&:to_h), recent_completions: @completed_states.last(5) } end end |
#with_work_unit(work_unit_id:) {|TrifectaState| ... } ⇒ Object
Create a scoped execution context that automatically manages work unit lifecycle
180 181 182 183 184 185 186 187 |
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 180 def with_work_unit(work_unit_id:) state = begin_work_unit(work_unit_id: work_unit_id) begin yield state ensure end_work_unit(work_unit_id) end end |
#would_allow?(work_unit_id, flag) ⇒ Hash
Check if an operation would be allowed for a work unit
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 98 def would_allow?(work_unit_id, flag) state = state_for(work_unit_id) unless state return { allowed: true, reason: "No active work unit - enforcement not applicable" } end if state.would_create_trifecta?(flag) { allowed: false, reason: "Would create lethal trifecta", current_state: state.to_h, flag: flag } else { allowed: true, reason: "Operation allowed", current_state: state.to_h, flag: flag } end end |
#wrap_agent_operation(work_unit_id:, untrusted_input_source: nil, private_data_source: nil, egress_source: nil) { ... } ⇒ Object
Convenience method to wrap an agent operation with security enforcement
197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 197 def wrap_agent_operation(work_unit_id:, untrusted_input_source: nil, private_data_source: nil, egress_source: nil) with_work_unit(work_unit_id: work_unit_id) do |state| # Enable flags based on what's provided state.enable(:untrusted_input, source: untrusted_input_source) if untrusted_input_source state.enable(:private_data, source: private_data_source) if private_data_source state.enable(:egress, source: egress_source) if egress_source yield state end end |