Class: Boxxspring::Worker::Base

Inherits:
Object
  • Object
show all
Includes:
Authorization, Logging, Metrics
Defined in:
lib/boxxspring/worker/base.rb

Direct Known Subclasses

TaskBase

Constant Summary

Constants included from Metrics

Metrics::METRICS_CLIENT, Metrics::METRICS_MUTEX, Metrics::METRICS_UPLOAD_INTERVAL

Constants included from Logging

Logging::PWD

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Authorization

#authorize, #authorize_operation

Methods included from Metrics

#initialize, #metric, #metric_defaults, #upload_metrics

Methods included from Logging

#logger

Class Method Details

.environmentObject



46
47
48
49
50
51
52
# File 'lib/boxxspring/worker/base.rb', line 46

def environment
  @environment ||= begin
    Worker.env == 'development' ?
      ( ENV[ 'USER' ].underscore || 'development' ) :
      Worker.env
  end
end

.process(&block) ⇒ Object



29
30
31
# File 'lib/boxxspring/worker/base.rb', line 29

def process( &block )
  self.processor = block
end

.queue_interfaceObject



33
34
35
# File 'lib/boxxspring/worker/base.rb', line 33

def queue_interface
  @queue_interface ||= Aws::SQS::Client.new
end

.queue_urlObject



37
38
39
40
41
42
43
44
# File 'lib/boxxspring/worker/base.rb', line 37

def queue_url
  @queue_url ||= begin
    response = self.queue_interface.create_queue(
      queue_name: self.full_queue_name
    )
    response[ :queue_url ]
  end
end

Instance Method Details

#processObject


operations



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/boxxspring/worker/base.rb', line 68

def process
   metric_defaults  dimensions: { worker_name: self.class.name,
                                  environment: environment } do

    messages = self.receive_messages() || []
    messages.each do | message |
      if message.present?
        payload = self.payload_from_message( message )

        if payload.present?
          begin
            metric :messages do
              result = self.process_payload( payload )

              # note: if an exception is raised the message will be deleted
              self.delete_message( message ) unless result == false
            end
          rescue StandardError => error
            metric :errors

            self.logger.error(
              "The #{ self.human_name } failed to process the payload."
            )
            self.logger.error( error.message )
            self.logger.info( error.backtrace.join( "\n" ) )
          end

        else
          self.delete_message( message )
          self.logger.error(
            "The #{ self.human_name } received an invalid payload."
          )
        end
      end
    end
  end
end