Class: Consolle::Server::RequestBroker
- Inherits:
-
Object
- Object
- Consolle::Server::RequestBroker
- Defined in:
- lib/consolle/server/request_broker.rb
Defined Under Namespace
Classes: RequestFuture
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#supervisor ⇒ Object
readonly
Returns the value of attribute supervisor.
Instance Method Summary collapse
-
#initialize(supervisor:, logger: nil) ⇒ RequestBroker
constructor
A new instance of RequestBroker.
- #process_request(request) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(supervisor:, logger: nil) ⇒ RequestBroker
11 12 13 14 15 16 17 18 19 |
# File 'lib/consolle/server/request_broker.rb', line 11 def initialize(supervisor:, logger: nil) @supervisor = supervisor @logger = logger || Logger.new(STDOUT) @queue = Queue.new @running = false @worker_thread = nil @request_map = {} @mutex = Mutex.new end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
9 10 11 |
# File 'lib/consolle/server/request_broker.rb', line 9 def logger @logger end |
#supervisor ⇒ Object (readonly)
Returns the value of attribute supervisor.
9 10 11 |
# File 'lib/consolle/server/request_broker.rb', line 9 def supervisor @supervisor end |
Instance Method Details
#process_request(request) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/consolle/server/request_broker.rb', line 43 def process_request(request) request_id = request['request_id'] || SecureRandom.uuid logger.debug "[RequestBroker] Received request: #{request_id}, action: #{request['action']}" if ENV['DEBUG'] # Create future for response future = RequestFuture.new # Store in map @mutex.synchronize do @request_map[request_id] = future end # Queue request @queue.push({ id: request_id, request: request, timestamp: Time.now }) logger.debug "[RequestBroker] Queued request: #{request_id}, queue size: #{@queue.size}" if ENV['DEBUG'] # Wait for response (with timeout) begin env_timeout = ENV['CONSOLLE_TIMEOUT']&.to_i future_timeout = (env_timeout && env_timeout > 0) ? env_timeout : (request['timeout'] || 30) logger.debug "[RequestBroker] Waiting for response: #{request_id}, timeout: #{future_timeout}" if ENV['DEBUG'] response = future.get(timeout: future_timeout) logger.debug "[RequestBroker] Got response: #{request_id}" if ENV['DEBUG'] response rescue Timeout::Error logger.debug "[RequestBroker] Request timed out: #{request_id}" if ENV['DEBUG'] { 'success' => false, 'error' => 'RequestTimeout', 'message' => 'Request timed out', 'request_id' => request_id } ensure # Clean up @mutex.synchronize do @request_map.delete(request_id) end end end |
#start ⇒ Object
21 22 23 24 25 26 27 |
# File 'lib/consolle/server/request_broker.rb', line 21 def start return if @running @running = true @worker_thread = start_worker logger.info '[RequestBroker] Started' end |
#stop ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/consolle/server/request_broker.rb', line 29 def stop return unless @running @running = false # Push poison pill to wake up worker @queue.push(nil) # Wait for worker to finish @worker_thread&.join(5) logger.info '[RequestBroker] Stopped' end |