Class: RubyLsp::Queue
- Inherits:
-
Object
- Object
- RubyLsp::Queue
- Extended by:
- T::Sig
- Defined in:
- lib/ruby_lsp/queue.rb
Defined Under Namespace
Instance Method Summary collapse
- #cancel(id) ⇒ Object
- #execute(request) ⇒ Object
- #finalize_request(result, request) ⇒ Object
-
#initialize(writer, handlers) ⇒ Queue
constructor
A new instance of Queue.
- #push(request) ⇒ Object
- #shutdown ⇒ Object
Constructor Details
#initialize(writer, handlers) ⇒ Queue
Returns a new instance of Queue.
34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/ruby_lsp/queue.rb', line 34 def initialize(writer, handlers) @writer = writer @handlers = handlers # The job queue is the actual list of requests we have to process @job_queue = T.let(Thread::Queue.new, Thread::Queue) # The jobs hash is just a way of keeping a handle to jobs based on the request ID, so we can cancel them @jobs = T.let({}, T::Hash[T.any(String, Integer), Job]) @mutex = T.let(Mutex.new, Mutex) @worker = T.let(new_worker, Thread) Thread.main.priority = 1 end |
Instance Method Details
#cancel(id) ⇒ Object
60 61 62 63 64 65 |
# File 'lib/ruby_lsp/queue.rb', line 60 def cancel(id) @mutex.synchronize do # Cancel the job if it's still in the queue @jobs[id]&.cancel end end |
#execute(request) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/ruby_lsp/queue.rb', line 80 def execute(request) response = T.let(nil, T.untyped) error = T.let(nil, T.nilable(Exception)) request_time = Benchmark.realtime do response = T.must(@handlers[request[:method]]).action.call(request) rescue StandardError, LoadError => e error = e end Queue::Result.new(response: response, error: error, request_time: request_time) end |
#finalize_request(result, request) ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/ruby_lsp/queue.rb', line 100 def finalize_request(result, request) @mutex.synchronize do error = result.error if error T.must(@handlers[request[:method]]).error_handler&.call(error, request) @writer.write( id: request[:id], error: { code: LanguageServer::Protocol::Constant::ErrorCodes::INTERNAL_ERROR, message: error.inspect, data: request.to_json, }, ) elsif result.response != Handler::VOID @writer.write(id: request[:id], result: result.response) end request_time = result.request_time if request_time @writer.write(method: "telemetry/event", params: telemetry_params(request, request_time, error)) end end end |
#push(request) ⇒ Object
48 49 50 51 52 53 54 55 56 57 |
# File 'lib/ruby_lsp/queue.rb', line 48 def push(request) job = Job.new(request: request, cancelled: false) # Remember a handle to the job, so that we can cancel it @mutex.synchronize do @jobs[request[:id]] = job end @job_queue << job end |
#shutdown ⇒ Object
68 69 70 71 72 73 74 75 |
# File 'lib/ruby_lsp/queue.rb', line 68 def shutdown # Close the queue so that we can no longer receive items @job_queue.close # Clear any remaining jobs so that the thread can terminate @job_queue.clear # Wait until the thread is finished @worker.join end |