Class: ActionCable::Server::Worker
- Inherits:
-
Object
- Object
- ActionCable::Server::Worker
show all
- Includes:
- ActiveRecordConnectionManagement, ActiveSupport::Callbacks
- Defined in:
- lib/action_cable/server/worker.rb,
lib/action_cable/server/worker/active_record_connection_management.rb
Overview
Worker used by Server.send_async to do connection work in threads.
Defined Under Namespace
Modules: ActiveRecordConnectionManagement
Instance Attribute Summary collapse
Instance Method Summary
collapse
-
#async_exec(receiver, *args, connection:, &block) ⇒ Object
-
#async_invoke(receiver, method, *args, connection: receiver, &block) ⇒ Object
-
#halt ⇒ Object
Stop processing work: any work that has not already started running will be discarded from the queue.
-
#initialize(max_size: 5) ⇒ Worker
constructor
A new instance of Worker.
-
#invoke(receiver, method, *args, connection:, &block) ⇒ Object
-
#stopping? ⇒ Boolean
-
#work(connection) ⇒ Object
#with_database_connections
Constructor Details
#initialize(max_size: 5) ⇒ Worker
17
18
19
20
21
22
23
|
# File 'lib/action_cable/server/worker.rb', line 17
def initialize(max_size: 5)
@executor = Concurrent::ThreadPoolExecutor.new(
min_threads: 1,
max_threads: max_size,
max_queue: 0,
)
end
|
Instance Attribute Details
#executor ⇒ Object
Returns the value of attribute executor.
15
16
17
|
# File 'lib/action_cable/server/worker.rb', line 15
def executor
@executor
end
|
Instance Method Details
#async_exec(receiver, *args, connection:, &block) ⇒ Object
45
46
47
|
# File 'lib/action_cable/server/worker.rb', line 45
def async_exec(receiver, *args, connection:, &block)
async_invoke receiver, :instance_exec, *args, connection: connection, &block
end
|
#async_invoke(receiver, method, *args, connection: receiver, &block) ⇒ Object
49
50
51
52
53
|
# File 'lib/action_cable/server/worker.rb', line 49
def async_invoke(receiver, method, *args, connection: receiver, &block)
@executor.post do
invoke(receiver, method, *args, connection: connection, &block)
end
end
|
#halt ⇒ Object
Stop processing work: any work that has not already started running will be discarded from the queue
27
28
29
|
# File 'lib/action_cable/server/worker.rb', line 27
def halt
@executor.kill
end
|
#invoke(receiver, method, *args, connection:, &block) ⇒ Object
55
56
57
58
59
60
61
62
63
64
65
66
|
# File 'lib/action_cable/server/worker.rb', line 55
def invoke(receiver, method, *args, connection:, &block)
work(connection) do
begin
receiver.send method, *args, &block
rescue Exception => e
logger.error "There was an exception - #{e.class}(#{e.message})"
logger.error e.backtrace.join("\n")
receiver.handle_exception if receiver.respond_to?(:handle_exception)
end
end
end
|
#stopping? ⇒ Boolean
31
32
33
|
# File 'lib/action_cable/server/worker.rb', line 31
def stopping?
@executor.shuttingdown?
end
|
#work(connection) ⇒ Object
35
36
37
38
39
40
41
42
43
|
# File 'lib/action_cable/server/worker.rb', line 35
def work(connection)
self.connection = connection
run_callbacks :work do
yield
end
ensure
self.connection = nil
end
|