Class: RubyLsp::Queue

Inherits:
Object
  • Object
show all
Extended by:
T::Sig
Defined in:
lib/ruby_lsp/queue.rb

Defined Under Namespace

Classes: Job, Result

Instance Method Summary collapse

Constructor Details

#initialize(writer, handlers) ⇒ Queue

Returns a new instance of Queue.



34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/ruby_lsp/queue.rb', line 34

def initialize(writer, handlers)
  @writer = writer
  @handlers = handlers
  # The job queue is the actual list of requests we have to process
  @job_queue = T.let(Thread::Queue.new, Thread::Queue)
  # The jobs hash is just a way of keeping a handle to jobs based on the request ID, so we can cancel them
  @jobs = T.let({}, T::Hash[T.any(String, Integer), Job])
  @mutex = T.let(Mutex.new, Mutex)
  @worker = T.let(new_worker, Thread)

  Thread.main.priority = 1
end

Instance Method Details

#cancel(id) ⇒ Object



60
61
62
63
64
65
# File 'lib/ruby_lsp/queue.rb', line 60

def cancel(id)
  @mutex.synchronize do
    # Cancel the job if it's still in the queue
    @jobs[id]&.cancel
  end
end

#execute(request) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/ruby_lsp/queue.rb', line 80

def execute(request)
  response = T.let(nil, T.untyped)
  error = T.let(nil, T.nilable(Exception))

  request_time = Benchmark.realtime do
    response = T.must(@handlers[request[:method]]).action.call(request)
  rescue StandardError, LoadError => e
    error = e
  end

  Queue::Result.new(response: response, error: error, request_time: request_time)
end

#finalize_request(result, request) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/ruby_lsp/queue.rb', line 100

def finalize_request(result, request)
  @mutex.synchronize do
    error = result.error
    if error
      T.must(@handlers[request[:method]]).error_handler&.call(error, request)

      @writer.write(
        id: request[:id],
        error: {
          code: LanguageServer::Protocol::Constant::ErrorCodes::INTERNAL_ERROR,
          message: error.inspect,
          data: request.to_json,
        },
      )
    elsif result.response != Handler::VOID
      @writer.write(id: request[:id], result: result.response)
    end

    request_time = result.request_time
    if request_time
      @writer.write(method: "telemetry/event", params: telemetry_params(request, request_time, error))
    end
  end
end

#push(request) ⇒ Object



48
49
50
51
52
53
54
55
56
57
# File 'lib/ruby_lsp/queue.rb', line 48

def push(request)
  job = Job.new(request: request, cancelled: false)

  # Remember a handle to the job, so that we can cancel it
  @mutex.synchronize do
    @jobs[request[:id]] = job
  end

  @job_queue << job
end

#shutdownObject



68
69
70
71
72
73
74
75
# File 'lib/ruby_lsp/queue.rb', line 68

def shutdown
  # Close the queue so that we can no longer receive items
  @job_queue.close
  # Clear any remaining jobs so that the thread can terminate
  @job_queue.clear
  # Wait until the thread is finished
  @worker.join
end