Class: Processor

Inherits:
Object
  • Object
show all
Defined in:
lib/wattics-api-client/processor.rb

Instance Method Summary collapse

Constructor Details

#initialize(agent) ⇒ Processor

Returns a new instance of Processor.



5
6
7
8
9
10
11
12
13
# File 'lib/wattics-api-client/processor.rb', line 5

def initialize(agent)
  @agent = agent
  @measurements_with_config = PriorityBlockingQueue.new
  @semaphore = Concurrent::Semaphore.new(0)
  @is_sending = false
  @mutex = Mutex.new
  @logger = Logger.new(STDOUT)
  @logger.level = Logger::WARN
end

Instance Method Details

#is_idle?Boolean

Returns:

  • (Boolean)


24
25
26
27
28
# File 'lib/wattics-api-client/processor.rb', line 24

def is_idle?
  @mutex.synchronize do
    @measurements_with_config.is_empty? && !@is_sending
  end
end

#process(measurement_with_config) ⇒ Object



15
16
17
18
19
20
21
22
# File 'lib/wattics-api-client/processor.rb', line 15

def process(measurement_with_config)
  @measurements_with_config << measurement_with_config
  if measurement_with_config.is_a?(Array)
    @semaphore.release(measurement_with_config.size)
  else
    @semaphore.release
  end
end

#runObject



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/wattics-api-client/processor.rb', line 30

def run
  client = ClientFactory.get_instance.create_client
  loop do
    @semaphore.acquire
    @mutex.synchronize do
      @measurement_with_config = @measurements_with_config.pop
      @is_sending = true
    end
    @measurement = @measurement_with_config.measurement
    @config = @measurement_with_config.config
    loop do
      begin
        @response = client.send(@measurement, @config)
        if !@agent.nil? && @response.code < 400
          @agent.report_sent_measurement(@measurement, @response)
        end
        if !@agent.nil? && @response.code >= 400
          @logger.error("Could not send #{@measurement}, Server Response: #{Nokogiri::HTML(@response.body).xpath('//h1').text}")
        end
        break
      rescue StandardError => e
        @logger.error("Could not send #{@measurement}, Server Response: #{e}")
        sleep 60
      end
    end
    @mutex.synchronize do
      @is_sending = false
    end
  end
rescue StandardError => e
  @logger.error("Thread stopped unexpectedly: #{e.message}")
end