15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
# File 'app/models/gaggle/session.rb', line 15
def start_executable
prepare_logging
logger = Logger.new(log_path)
Rails.logger.info "Starting goose session for: #{goose.name}"
self.class.running_executables[to_global_id] = Thread.new(logger) do |session_logger|
Thread.current.name = "gaggle-session-#{to_global_id}"
Thread.current[:input_queue] = input_queue = Queue.new
prompt_mutex = Mutex.new
prompt_cond = ConditionVariable.new
prompt_active = false
server_name = ::MCP::Rails.configuration.for_engine(Gaggle::Engine).server_name
server_path = ::MCP::Rails.configuration.output_directory.join(server_name)
Thread.current[:logger] = session_logger
PTY.spawn("goose session --with-extension \"GOOSE_USER_ID=#{goose.id} #{server_path}_server.rb\"") do |stdout, stdin, pid|
session_logger.info "Spawned PID: #{pid}"
stdout.sync = true
stdin.sync = true
input_thread = Thread.new(input_queue, stdin, prompt_mutex, prompt_cond, name: "input-#{to_global_id}") do |queue, input_stream, mutex, cond|
loop do
mutex.synchronize do
cond.wait(mutex)
next if queue.empty?
begin
command = queue.pop
cmd_to_send = command.is_a?(String) ? "#{command}#{SUBMISSION_SIGNAL}" : command&.to_s || SUBMISSION_SIGNAL
session_logger.info "Sent command: #{command.inspect}"
input_stream.write(cmd_to_send)
input_stream.flush
prompt_active = false rescue StandardError => e
session_logger.error "Input thread error: #{e.class} - #{e.message}"
end
end
end
end
begin
loop do
ready = IO.select([ stdout ], nil, nil, 2)
if ready
line = nil
begin
Timeout.timeout(1) { line = stdout.gets }
rescue Timeout::Error
alive = Process.waitpid(pid, Process::WNOHANG).nil?
break unless alive
next
end
if line
prompt_active = false
session_logger.info "Output: #{line.chomp}"
broadcast_output(line)
else
break
end
else
prompt_mutex.synchronize do
unless prompt_active || input_queue.empty?
prompt_active = true
prompt_cond.signal
end
end
end
end
rescue Errno::EIO, Errno::EPIPE, IOError => e
session_logger.error "Stream error: #{e.class} - #{e.message}"
rescue StandardError => e
session_logger.error "Unexpected error: #{e.class} - #{e.message}"
ensure
stdout.close unless stdout.closed?
stdin.close unless stdin.closed?
input_thread.kill
self.class.running_executables.delete(to_global_id)
session_logger.info "Session thread exiting, PID: #{pid rescue 'unknown'}"
end
end
end
end
|