Class: Arsenicum::Core::Broker

Inherits:
Object
  • Object
show all
Includes:
IOHelper
Defined in:
lib/arsenicum/core/broker.rb

Constant Summary collapse

PROCESSOR_COUNT_DEFAULT =
2

Constants included from IOHelper

IOHelper::TYPE_INT, IOHelper::TYPE_STRING

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from IOHelper

#read_message, #write_message

Constructor Details

#initialize(options = {}) ⇒ Broker

Returns a new instance of Broker.



12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/arsenicum/core/broker.rb', line 12

def initialize(options = {})
  @worker_count = (options.delete(:worker_count) || PROCESSOR_COUNT_DEFAULT).to_i
  @tasks = {}
  @router = options.delete :router

  serializer = options[:serializer]  ||  Arsenicum::Serializer::JSON.new
  formatter  = options[:formatter]   ||  Arsenicum::Formatter.new
  @worker_options = options.delete(:worker_options) || {}
  @worker_options.merge! serializer: serializer,  formatter: formatter
  @current_worker_index = -1 # because it is incremented whenever used. (primary index should be ZERO)
  @mutex = Mutex.new
end

Instance Attribute Details

#available_workersObject (readonly)

Returns the value of attribute available_workers.



6
7
8
# File 'lib/arsenicum/core/broker.rb', line 6

def available_workers
  @available_workers
end

#default_taskObject

Returns the value of attribute default_task.



8
9
10
# File 'lib/arsenicum/core/broker.rb', line 8

def default_task
  @default_task
end

#mutexObject (readonly)

Returns the value of attribute mutex.



6
7
8
# File 'lib/arsenicum/core/broker.rb', line 6

def mutex
  @mutex
end

#routerObject (readonly)

Returns the value of attribute router.



4
5
6
# File 'lib/arsenicum/core/broker.rb', line 4

def router
  @router
end

#tasksObject (readonly)

Returns the value of attribute tasks.



7
8
9
# File 'lib/arsenicum/core/broker.rb', line 7

def tasks
  @tasks
end

#worker_countObject (readonly)

Returns the value of attribute worker_count.



7
8
9
# File 'lib/arsenicum/core/broker.rb', line 7

def worker_count
  @worker_count
end

#worker_optionsObject (readonly)

Returns the value of attribute worker_options.



7
8
9
# File 'lib/arsenicum/core/broker.rb', line 7

def worker_options
  @worker_options
end

#workersObject (readonly)

Returns the value of attribute workers.



6
7
8
# File 'lib/arsenicum/core/broker.rb', line 6

def workers
  @workers
end

Instance Method Details

#[](task_id) ⇒ Object



25
26
27
# File 'lib/arsenicum/core/broker.rb', line 25

def [](task_id)
  tasks[task_id.to_sym] || default_task
end

#[]=(task_id, task) ⇒ Object Also known as: register



29
30
31
# File 'lib/arsenicum/core/broker.rb', line 29

def []=(task_id, task)
  tasks[task_id.to_sym] = task
end

#broker(success_handler, failure_handler, task_id, *parameters) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
# File 'lib/arsenicum/core/broker.rb', line 42

def broker(success_handler, failure_handler, task_id, *parameters)
  worker = loop do
    w = next_worker
    break w if w

    sleep 0.5
  end

  Arsenicum::Logger.info { "[broker][Task brokering]id=#{task_id}, params=#{parameters.inspect}" }
  worker.ask_async success_handler, failure_handler, task_id, *parameters
end

#delegate(message, success_handler, failure_handler) ⇒ Object



54
55
56
57
# File 'lib/arsenicum/core/broker.rb', line 54

def delegate(message, success_handler, failure_handler)
  (task_id, parameters) = router.route(message)
  broker success_handler, failure_handler, task_id, parameters
end

#get_back_worker(worker) ⇒ Object



77
78
79
80
81
82
83
84
85
# File 'lib/arsenicum/core/broker.rb', line 77

def get_back_worker(worker)
  if worker.active?
    available_workers << worker
  else
    remove worker
    worker.stop
    prepare_worker
  end
end

#reloadObject



68
69
70
71
72
73
74
75
# File 'lib/arsenicum/core/broker.rb', line 68

def reload
  stop

  available_workers.clear
  workers.clear

  prepare_workers
end

#remove(worker) ⇒ Object



63
64
65
66
# File 'lib/arsenicum/core/broker.rb', line 63

def remove(worker)
  available_workers.delete(worker)
  workers.delete(worker)
end

#runObject



35
36
37
38
39
40
# File 'lib/arsenicum/core/broker.rb', line 35

def run
  @workers = []
  @available_workers = []

  prepare_workers
end

#stopObject



59
60
61
# File 'lib/arsenicum/core/broker.rb', line 59

def stop
  workers.each(&:stop)
end