Class: Actory::Sender::Dispatcher

Inherits:
Base
  • Object
show all
Defined in:
lib/actory/sender/dispatcher.rb

Constant Summary

Constants inherited from Base

Base::SENDER

Constants inherited from Base

Base::GLOBAL

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Base

get_logger_level, get_logger_output

Constructor Details

#initialize(actors: []) ⇒ Dispatcher

Returns a new instance of Dispatcher.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/actory/sender/dispatcher.rb', line 7

def initialize(actors: [])
  @actors = []
  @trusted_hosts = []
  @receiver_count = 0
  @system_info = []
  @my_processor_count = Parallel.processor_count
  ret = initial_handshaking(actors)
  raise StandardError if ret == 0
  count = establish_connections
  raise StandardError if count == 0
rescue => e
  @@logger.error Actory::Errors::Generator.new.json(level: "error", message: "Initialization failed.", backtrace: $@)
  exit 1
end

Instance Attribute Details

#actorsObject

Returns the value of attribute actors.



5
6
7
# File 'lib/actory/sender/dispatcher.rb', line 5

def actors
  @actors
end

#my_processor_countObject

Returns the value of attribute my_processor_count.



5
6
7
# File 'lib/actory/sender/dispatcher.rb', line 5

def my_processor_count
  @my_processor_count
end

#receiver_countObject

Returns the value of attribute receiver_count.



5
6
7
# File 'lib/actory/sender/dispatcher.rb', line 5

def receiver_count
  @receiver_count
end

#system_infoObject

Returns the value of attribute system_info.



5
6
7
# File 'lib/actory/sender/dispatcher.rb', line 5

def system_info
  @system_info
end

#trusted_hostsObject

Returns the value of attribute trusted_hosts.



5
6
7
# File 'lib/actory/sender/dispatcher.rb', line 5

def trusted_hosts
  @trusted_hosts
end

Instance Method Details

#message(method, args = [], results = []) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/actory/sender/dispatcher.rb', line 22

def message(method, args=[], results=[])
  args = [nil] if args.empty?
  assignment = assign_jobs(args)

  pbar = ProgressBar.new(method, @receiver_count) if SENDER['show_progress']

  results << Parallel.map(assignment, :in_processes => @receiver_count) do |arg, actor|
    if SENDER['show_progress']
      begin
        pbar.set pbar.current + 1 if pbar.current <= @receiver_count
      rescue
      end
    end

    begin
      actor.send("receive", "reload") if SENDER['reload_receiver_plugins']
      res = actor.send("receive", method, arg)
      sleep SENDER['get_interval']
      ret = res.get
      ret.flatten!
      {actor.address.to_s => ret}
    rescue => e
      @@logger.warn Actory::Errors::Generator.new.json(level: "warn", message: "Something wrong with sending a message to #{actor.address}", backtrace: $@)
      actor = change_actor(actor)
      retry
    end
  end
  results.flatten
end