Class: ActionCable::Server::Worker
- Inherits:
-
Object
- Object
- ActionCable::Server::Worker
show all
- Includes:
- ActiveRecordConnectionManagement, ActiveSupport::Callbacks
- Defined in:
- lib/action_cable/server/worker.rb,
lib/action_cable/server/worker/active_record_connection_management.rb
Overview
Worker used by Server.send_async to do connection work in threads.
Defined Under Namespace
Modules: ActiveRecordConnectionManagement
Instance Attribute Summary collapse
Instance Method Summary
collapse
-
#async_exec(receiver, *args, connection:, &block) ⇒ Object
-
#async_invoke(receiver, method, *args, connection: receiver, &block) ⇒ Object
-
#halt ⇒ Object
Stop processing work: any work that has not already started running will be discarded from the queue.
-
#initialize(max_size: 5) ⇒ Worker
constructor
A new instance of Worker.
-
#invoke(receiver, method, *args, connection:, &block) ⇒ Object
-
#stopping? ⇒ Boolean
-
#work(connection, &block) ⇒ Object
#with_database_connections
Constructor Details
#initialize(max_size: 5) ⇒ Worker
21
22
23
24
25
26
27
28
|
# File 'lib/action_cable/server/worker.rb', line 21
def initialize(max_size: 5)
@executor = Concurrent::ThreadPoolExecutor.new(
name: "ActionCable-server",
min_threads: 1,
max_threads: max_size,
max_queue: 0,
)
end
|
Instance Attribute Details
#executor ⇒ Object
Returns the value of attribute executor.
19
20
21
|
# File 'lib/action_cable/server/worker.rb', line 19
def executor
@executor
end
|
Instance Method Details
#async_exec(receiver, *args, connection:, &block) ⇒ Object
48
49
50
|
# File 'lib/action_cable/server/worker.rb', line 48
def async_exec(receiver, *args, connection:, &block)
async_invoke receiver, :instance_exec, *args, connection: connection, &block
end
|
#async_invoke(receiver, method, *args, connection: receiver, &block) ⇒ Object
52
53
54
55
56
|
# File 'lib/action_cable/server/worker.rb', line 52
def async_invoke(receiver, method, *args, connection: receiver, &block)
@executor.post do
invoke(receiver, method, *args, connection: connection, &block)
end
end
|
#halt ⇒ Object
Stop processing work: any work that has not already started running will be discarded from the queue
32
33
34
|
# File 'lib/action_cable/server/worker.rb', line 32
def halt
@executor.shutdown
end
|
#invoke(receiver, method, *args, connection:, &block) ⇒ Object
58
59
60
61
62
63
64
65
66
67
|
# File 'lib/action_cable/server/worker.rb', line 58
def invoke(receiver, method, *args, connection:, &block)
work(connection) do
receiver.send method, *args, &block
rescue Exception => e
logger.error "There was an exception - #{e.class}(#{e.message})"
logger.error e.backtrace.join("\n")
receiver.handle_exception if receiver.respond_to?(:handle_exception)
end
end
|
#stopping? ⇒ Boolean
36
37
38
|
# File 'lib/action_cable/server/worker.rb', line 36
def stopping?
@executor.shuttingdown?
end
|
#work(connection, &block) ⇒ Object
40
41
42
43
44
45
46
|
# File 'lib/action_cable/server/worker.rb', line 40
def work(connection, &block)
self.connection = connection
run_callbacks :work, &block
ensure
self.connection = nil
end
|