Class: Aidp::Security::RuleOfTwoEnforcer

Inherits:
Object
  • Object
show all
Defined in:
lib/aidp/security/rule_of_two_enforcer.rb

Overview

Main enforcement engine for the Rule of Two security policy Tracks trifecta state per work unit and denies operations that would create the lethal trifecta (untrusted_input + private_data + egress)

Usage:

enforcer = RuleOfTwoEnforcer.new
state = enforcer.begin_work_unit(work_unit_id: "unit_123")
state.enable(:untrusted_input, source: "github_issue")
state.enable(:egress, source: "git_push")
# This would raise PolicyViolation:
# state.enable(:private_data, source: "env_var_access")
enforcer.end_work_unit("unit_123")

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config: {}) ⇒ RuleOfTwoEnforcer

Returns a new instance of RuleOfTwoEnforcer.



20
21
22
23
24
25
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 20

def initialize(config: {})
  @config = config
  @active_states = {}
  @completed_states = []
  @mutex = Mutex.new
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



18
19
20
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 18

def config
  @config
end

Instance Method Details

#active?(work_unit_id) ⇒ Boolean

Check if a work unit is currently active

Returns:

  • (Boolean)


85
86
87
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 85

def active?(work_unit_id)
  @mutex.synchronize { @active_states.key?(work_unit_id) }
end

#active_countObject

Get count of active work units



90
91
92
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 90

def active_count
  @mutex.synchronize { @active_states.size }
end

#audit_logObject

Audit log of all completed work units with their final states



164
165
166
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 164

def audit_log
  @mutex.synchronize { @completed_states.dup }
end

#begin_work_unit(work_unit_id:) ⇒ TrifectaState

Begin tracking a new work unit

Parameters:

  • work_unit_id (String)

    Unique identifier for the work unit

Returns:



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 30

def begin_work_unit(work_unit_id:)
  @mutex.synchronize do
    if @active_states.key?(work_unit_id)
      Aidp.log_warn("security.enforcer", "work_unit_already_active",
        work_unit_id: work_unit_id)
      return @active_states[work_unit_id]
    end

    state = TrifectaState.new(work_unit_id: work_unit_id)
    @active_states[work_unit_id] = state

    Aidp.log_debug("security.enforcer", "work_unit_started",
      work_unit_id: work_unit_id,
      active_count: @active_states.size)

    state
  end
end

#enabled?Boolean

Check if enforcement is currently enabled

Returns:

  • (Boolean)


145
146
147
148
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 145

def enabled?
  # Default to enabled unless explicitly disabled
  @config.fetch(:enabled, true)
end

#end_work_unit(work_unit_id) ⇒ Hash

End tracking for a work unit

Parameters:

  • work_unit_id (String)

    Unique identifier for the work unit

Returns:

  • (Hash)

    Final state summary



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 52

def end_work_unit(work_unit_id)
  @mutex.synchronize do
    state = @active_states.delete(work_unit_id)

    unless state
      Aidp.log_warn("security.enforcer", "work_unit_not_found",
        work_unit_id: work_unit_id)
      return nil
    end

    summary = state.to_h
    @completed_states << summary

    # Keep only last 100 completed states
    @completed_states.shift if @completed_states.size > 100

    Aidp.log_debug("security.enforcer", "work_unit_ended",
      work_unit_id: work_unit_id,
      final_state: summary,
      active_count: @active_states.size)

    summary
  end
end

#enforce!(work_unit_id:, flag:, source:) ⇒ Object

Enforce a flag on a work unit - raises PolicyViolation on failure

Parameters:

  • work_unit_id (String)

    Work unit identifier

  • flag (Symbol)

    The flag to enable

  • source (String)

    Description of the operation causing the flag

Raises:



130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 130

def enforce!(work_unit_id:, flag:, source:)
  state = state_for(work_unit_id)

  unless state
    Aidp.log_warn("security.enforcer", "enforce_on_inactive_unit",
      work_unit_id: work_unit_id,
      flag: flag,
      source: source)
    return nil
  end

  state.enable(flag, source: source)
end

#reset!Object

Reset enforcer state (primarily for testing)



169
170
171
172
173
174
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 169

def reset!
  @mutex.synchronize do
    @active_states.clear
    @completed_states.clear
  end
end

#state_for(work_unit_id) ⇒ TrifectaState?

Get the state for an active work unit

Parameters:

  • work_unit_id (String)

    Unique identifier for the work unit

Returns:



80
81
82
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 80

def state_for(work_unit_id)
  @mutex.synchronize { @active_states[work_unit_id] }
end

#status_summaryObject

Get summary of current enforcement status



151
152
153
154
155
156
157
158
159
160
161
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 151

def status_summary
  @mutex.synchronize do
    {
      enabled: enabled?,
      active_work_units: @active_states.size,
      completed_work_units: @completed_states.size,
      active_states: @active_states.transform_values(&:to_h),
      recent_completions: @completed_states.last(5)
    }
  end
end

#with_work_unit(work_unit_id:) {|TrifectaState| ... } ⇒ Object

Create a scoped execution context that automatically manages work unit lifecycle

Parameters:

  • work_unit_id (String)

    Unique identifier for the work unit

Yields:

Returns:

  • (Object)

    The result of the block



180
181
182
183
184
185
186
187
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 180

def with_work_unit(work_unit_id:)
  state = begin_work_unit(work_unit_id: work_unit_id)
  begin
    yield state
  ensure
    end_work_unit(work_unit_id)
  end
end

#would_allow?(work_unit_id, flag) ⇒ Hash

Check if an operation would be allowed for a work unit

Parameters:

  • work_unit_id (String)

    Work unit identifier

  • flag (Symbol)

    The flag to check (:untrusted_input, :private_data, :egress)

Returns:

  • (Hash)

    { allowed: boolean, reason: string }



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 98

def would_allow?(work_unit_id, flag)
  state = state_for(work_unit_id)

  unless state
    return {
      allowed: true,
      reason: "No active work unit - enforcement not applicable"
    }
  end

  if state.would_create_trifecta?(flag)
    {
      allowed: false,
      reason: "Would create lethal trifecta",
      current_state: state.to_h,
      flag: flag
    }
  else
    {
      allowed: true,
      reason: "Operation allowed",
      current_state: state.to_h,
      flag: flag
    }
  end
end

#wrap_agent_operation(work_unit_id:, untrusted_input_source: nil, private_data_source: nil, egress_source: nil) { ... } ⇒ Object

Convenience method to wrap an agent operation with security enforcement

Parameters:

  • work_unit_id (String)

    Work unit identifier

  • untrusted_input_source (String, nil) (defaults to: nil)

    Source of untrusted input

  • private_data_source (String, nil) (defaults to: nil)

    Source of private data access

  • egress_source (String, nil) (defaults to: nil)

    Source of egress capability

Yields:

  • The operation to execute

Returns:

  • (Object)

    The result of the block

Raises:



197
198
199
200
201
202
203
204
205
206
207
# File 'lib/aidp/security/rule_of_two_enforcer.rb', line 197

def wrap_agent_operation(work_unit_id:, untrusted_input_source: nil,
  private_data_source: nil, egress_source: nil)
  with_work_unit(work_unit_id: work_unit_id) do |state|
    # Enable flags based on what's provided
    state.enable(:untrusted_input, source: untrusted_input_source) if untrusted_input_source
    state.enable(:private_data, source: private_data_source) if private_data_source
    state.enable(:egress, source: egress_source) if egress_source

    yield state
  end
end