Class: DeepTest::Agent

Inherits:
Object
  • Object
show all
Includes:
Demon
Defined in:
lib/deep_test/agent.rb

Defined Under Namespace

Classes: Error

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Demon

#forked

Constructor Details

#initialize(number, options, listener) ⇒ Agent

Returns a new instance of Agent.



6
7
8
9
10
# File 'lib/deep_test/agent.rb', line 6

def initialize(number, options, listener)
  @number = number
  @listener = listener
  @options = options
end

Instance Attribute Details

#numberObject (readonly)

Returns the value of attribute number.



4
5
6
# File 'lib/deep_test/agent.rb', line 4

def number
  @number
end

Instance Method Details

#connect(stream_to_parent_process) ⇒ Object



12
13
14
15
16
17
18
19
20
21
# File 'lib/deep_test/agent.rb', line 12

def connect(stream_to_parent_process)
  DeepTest.logger.debug { "Agent: Connecting to #{@options.origin_hostname}:#{@options.server_port}" }
  @options.connect_to_central_command do |wire|
    stream_to_parent_process.puts "Connected"
    stream_to_parent_process.close rescue nil
    yield wire
  end
ensure
  stream_to_parent_process.close unless stream_to_parent_process.closed?
end

#execute(stream_from_child_process, stream_to_parent_process) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/deep_test/agent.rb', line 23

def execute(stream_from_child_process, stream_to_parent_process)
  stream_from_child_process.close
  connect(stream_to_parent_process) do |wire|
    reseed_random_numbers
    reconnect_to_database

    @listener.starting(self)
    wire.send_message CentralCommand::NeedWork

    while work_unit_message = next_work_unit_message(wire)
      @listener.starting_work(self, work_unit_message.body)

      result = begin
                 Metrics::Measurement.send_home("Agents Performing Work", wire, @options) do
                   work_unit_message.body.run
                 end
               rescue Exception => error
                 Error.new(work_unit_message.body, error)
               end

      @listener.finished_work(self, work_unit_message.body, result)
      send_result wire, work_unit_message, result
    end
  end
rescue CentralCommand::NoWorkUnitsRemainingError
  DeepTest.logger.debug { "Agent #{number}: no more work to do" }
end

#next_work_unit_message(wire) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/deep_test/agent.rb', line 51

def next_work_unit_message(wire)
  Metrics::Measurement.send_home("Agents Retrieving Work", wire, @options) do
    begin
      message = wire.next_message(:timeout => 2)
      next message.body == CentralCommand::NoMoreWork ? nil : message
    rescue Telegraph::NoMessageAvailable
      DeepTest.logger.debug { "Agent: NoMessageAvailable" }
      retry
    rescue Telegraph::LineDead
      DeepTest.logger.debug { "Agent: LineDead" }
      next nil
    end
  end
end

#reconnect_to_databaseObject



70
71
72
# File 'lib/deep_test/agent.rb', line 70

def reconnect_to_database
  ActiveRecord::Base.connection.reconnect! if defined?(ActiveRecord::Base)
end

#reseed_random_numbersObject



74
75
76
# File 'lib/deep_test/agent.rb', line 74

def reseed_random_numbers
  srand
end

#send_result(wire, work_unit_message, result) ⇒ Object



66
67
68
# File 'lib/deep_test/agent.rb', line 66

def send_result(wire, work_unit_message, result)
  wire.send_message result, :ack => work_unit_message
end