Class: Arsenicum::Core::Broker
- Inherits:
-
Object
- Object
- Arsenicum::Core::Broker
- Includes:
- IOHelper
- Defined in:
- lib/arsenicum/core/broker.rb
Constant Summary collapse
- PROCESSOR_COUNT_DEFAULT =
2
Constants included from IOHelper
IOHelper::TYPE_INT, IOHelper::TYPE_STRING
Instance Attribute Summary collapse
-
#available_workers ⇒ Object
readonly
Returns the value of attribute available_workers.
-
#default_task ⇒ Object
Returns the value of attribute default_task.
-
#mutex ⇒ Object
readonly
Returns the value of attribute mutex.
-
#router ⇒ Object
readonly
Returns the value of attribute router.
-
#tasks ⇒ Object
readonly
Returns the value of attribute tasks.
-
#worker_count ⇒ Object
readonly
Returns the value of attribute worker_count.
-
#worker_options ⇒ Object
readonly
Returns the value of attribute worker_options.
-
#workers ⇒ Object
readonly
Returns the value of attribute workers.
Instance Method Summary collapse
- #[](task_id) ⇒ Object
- #[]=(task_id, task) ⇒ Object (also: #register)
- #broker(success_handler, failure_handler, task_id, *parameters) ⇒ Object
- #delegate(message, success_handler, failure_handler) ⇒ Object
- #get_back_worker(worker) ⇒ Object
-
#initialize(options = {}) ⇒ Broker
constructor
A new instance of Broker.
- #reload ⇒ Object
- #remove(worker) ⇒ Object
- #run ⇒ Object
- #stop ⇒ Object
Methods included from IOHelper
Constructor Details
#initialize(options = {}) ⇒ Broker
Returns a new instance of Broker.
12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/arsenicum/core/broker.rb', line 12 def initialize( = {}) @worker_count = (.delete(:worker_count) || PROCESSOR_COUNT_DEFAULT).to_i @tasks = {} @router = .delete :router serializer = [:serializer] || Arsenicum::Serializer::JSON.new formatter = [:formatter] || Arsenicum::Formatter.new @worker_options = .delete(:worker_options) || {} @worker_options.merge! serializer: serializer, formatter: formatter @current_worker_index = -1 # because it is incremented whenever used. (primary index should be ZERO) @mutex = Mutex.new end |
Instance Attribute Details
#available_workers ⇒ Object (readonly)
Returns the value of attribute available_workers.
6 7 8 |
# File 'lib/arsenicum/core/broker.rb', line 6 def available_workers @available_workers end |
#default_task ⇒ Object
Returns the value of attribute default_task.
8 9 10 |
# File 'lib/arsenicum/core/broker.rb', line 8 def default_task @default_task end |
#mutex ⇒ Object (readonly)
Returns the value of attribute mutex.
6 7 8 |
# File 'lib/arsenicum/core/broker.rb', line 6 def mutex @mutex end |
#router ⇒ Object (readonly)
Returns the value of attribute router.
4 5 6 |
# File 'lib/arsenicum/core/broker.rb', line 4 def router @router end |
#tasks ⇒ Object (readonly)
Returns the value of attribute tasks.
7 8 9 |
# File 'lib/arsenicum/core/broker.rb', line 7 def tasks @tasks end |
#worker_count ⇒ Object (readonly)
Returns the value of attribute worker_count.
7 8 9 |
# File 'lib/arsenicum/core/broker.rb', line 7 def worker_count @worker_count end |
#worker_options ⇒ Object (readonly)
Returns the value of attribute worker_options.
7 8 9 |
# File 'lib/arsenicum/core/broker.rb', line 7 def @worker_options end |
#workers ⇒ Object (readonly)
Returns the value of attribute workers.
6 7 8 |
# File 'lib/arsenicum/core/broker.rb', line 6 def workers @workers end |
Instance Method Details
#[](task_id) ⇒ Object
25 26 27 |
# File 'lib/arsenicum/core/broker.rb', line 25 def [](task_id) tasks[task_id.to_sym] || default_task end |
#[]=(task_id, task) ⇒ Object Also known as: register
29 30 31 |
# File 'lib/arsenicum/core/broker.rb', line 29 def []=(task_id, task) tasks[task_id.to_sym] = task end |
#broker(success_handler, failure_handler, task_id, *parameters) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/arsenicum/core/broker.rb', line 42 def broker(success_handler, failure_handler, task_id, *parameters) worker = loop do w = next_worker break w if w sleep 0.5 end Arsenicum::Logger.info { "[broker][Task brokering]id=#{task_id}, params=#{parameters.inspect}" } worker.ask_async success_handler, failure_handler, task_id, *parameters end |
#delegate(message, success_handler, failure_handler) ⇒ Object
54 55 56 57 |
# File 'lib/arsenicum/core/broker.rb', line 54 def delegate(, success_handler, failure_handler) (task_id, parameters) = router.route() broker success_handler, failure_handler, task_id, parameters end |
#get_back_worker(worker) ⇒ Object
77 78 79 80 81 82 83 84 85 |
# File 'lib/arsenicum/core/broker.rb', line 77 def get_back_worker(worker) if worker.active? available_workers << worker else remove worker worker.stop prepare_worker end end |
#reload ⇒ Object
68 69 70 71 72 73 74 75 |
# File 'lib/arsenicum/core/broker.rb', line 68 def reload stop available_workers.clear workers.clear prepare_workers end |
#remove(worker) ⇒ Object
63 64 65 66 |
# File 'lib/arsenicum/core/broker.rb', line 63 def remove(worker) available_workers.delete(worker) workers.delete(worker) end |
#run ⇒ Object
35 36 37 38 39 40 |
# File 'lib/arsenicum/core/broker.rb', line 35 def run @workers = [] @available_workers = [] prepare_workers end |
#stop ⇒ Object
59 60 61 |
# File 'lib/arsenicum/core/broker.rb', line 59 def stop workers.each(&:stop) end |