Class: HerdstWorker::Queue::Processor

Inherits:
Runner
  • Object
show all
Defined in:
lib/herdst_worker/queue/processor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Runner

#execute_message!, #process_message!

Constructor Details

#initialize(app, enabled, runtime, queue_url, queue_wait_time) ⇒ Processor

Returns a new instance of Processor.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/herdst_worker/queue/processor.rb', line 17

def initialize(app, enabled, runtime, queue_url, queue_wait_time)
    self.app = app
    self.enabled = enabled
    self.queue_url = queue_url
    self.queue_wait_time = queue_wait_time
    self.poller = Aws::SQS::QueuePoller.new(queue_url)
    self.job_count = 0
    self.max_jobs = 10
    self.attempt_threshold = 6
    self.visibility_timeout = 15
    self.ignored_notifications = [
        "AmazonSnsSubscriptionSucceeded"
    ]

    self.processor_status = "starting"
    self.processor_expired = false
    self.run_duration = runtime
    self.reset_time

    # Start the processor as working
    self.set_status "starting"

    # Log queue stats
    self.poller.before_request do |stats|
        before_request(stats)
    end
end

Instance Attribute Details

#appObject

Returns the value of attribute app.



11
12
13
# File 'lib/herdst_worker/queue/processor.rb', line 11

def app
  @app
end

#attempt_thresholdObject

Returns the value of attribute attempt_threshold.



14
15
16
# File 'lib/herdst_worker/queue/processor.rb', line 14

def attempt_threshold
  @attempt_threshold
end

#enabledObject

Returns the value of attribute enabled.



11
12
13
# File 'lib/herdst_worker/queue/processor.rb', line 11

def enabled
  @enabled
end

#expire_atObject

Returns the value of attribute expire_at.



12
13
14
# File 'lib/herdst_worker/queue/processor.rb', line 12

def expire_at
  @expire_at
end

#ignored_notificationsObject

Returns the value of attribute ignored_notifications.



14
15
16
# File 'lib/herdst_worker/queue/processor.rb', line 14

def ignored_notifications
  @ignored_notifications
end

#job_countObject

Returns the value of attribute job_count.



13
14
15
# File 'lib/herdst_worker/queue/processor.rb', line 13

def job_count
  @job_count
end

#max_jobsObject

Returns the value of attribute max_jobs.



13
14
15
# File 'lib/herdst_worker/queue/processor.rb', line 13

def max_jobs
  @max_jobs
end

#pollerObject

Returns the value of attribute poller.



11
12
13
# File 'lib/herdst_worker/queue/processor.rb', line 11

def poller
  @poller
end

#processor_expiredObject

Returns the value of attribute processor_expired.



13
14
15
# File 'lib/herdst_worker/queue/processor.rb', line 13

def processor_expired
  @processor_expired
end

#processor_statusObject

Returns the value of attribute processor_status.



13
14
15
# File 'lib/herdst_worker/queue/processor.rb', line 13

def processor_status
  @processor_status
end

#queue_urlObject

Returns the value of attribute queue_url.



11
12
13
# File 'lib/herdst_worker/queue/processor.rb', line 11

def queue_url
  @queue_url
end

#queue_wait_timeObject

Returns the value of attribute queue_wait_time.



11
12
13
# File 'lib/herdst_worker/queue/processor.rb', line 11

def queue_wait_time
  @queue_wait_time
end

#run_durationObject

Returns the value of attribute run_duration.



12
13
14
# File 'lib/herdst_worker/queue/processor.rb', line 12

def run_duration
  @run_duration
end

#start_timeObject

Returns the value of attribute start_time.



12
13
14
# File 'lib/herdst_worker/queue/processor.rb', line 12

def start_time
  @start_time
end

#visibility_timeoutObject

Returns the value of attribute visibility_timeout.



14
15
16
# File 'lib/herdst_worker/queue/processor.rb', line 14

def visibility_timeout
  @visibility_timeout
end

Instance Method Details

#before_request(stats) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/herdst_worker/queue/processor.rb', line 101

def before_request(stats)
    if self.app.config.is_dev?
        self.app.logger.queue_stats.info "STATS (#{self.processor_status}): #{stats.inspect}"
    end
    
    if self.poller.client.config.credentials.has_expired?
        self.poller.client.config.credentials = self.app.config..get_aws_credentials
    end
    
    # After hours of running terminate application. 
    # The app will automatically restart in production
    current_time = Time.now.utc.to_i
    
    if (self.processor_status == "working") && (current_time >= self.expire_at) && (self.processor_expired == false)
        self.processor_expired = true
        self.app.force_stop(900)
        self.app.register_new_task
    end

    # On stopping wait for jobs to complete and then set status
    # to stopped. Once stopped the polling will terminate.
    if self.processor_status == "stopping"
        if self.job_count == 0
            self.app.logger.queue.warn "Setting processor status to stopped"
            set_status "stopped"
        end
    end
    
    # Once stopped exit the application
    if self.processor_status == "stopped"
        self.app.logger.queue.warn "Stopping polling, Service requested to stop"

        throw :stop_polling
    end
end

#process_message(msg) ⇒ Object



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/herdst_worker/queue/processor.rb', line 138

def process_message(msg)
    if self.processor_status == "working"
        # If the app is already processing the max number of jobs 
        # put the message back in the queue with a short wait time
        if self.job_count >= self.max_jobs
            self.poller.change_message_visibility_timeout(msg, self.visibility_timeout)
            throw :skip_delete
        end
        
        # Find out how many attempts there has been already for 
        # the message.
        msg_attrs = msg.message_attributes.dup
        attempt_number = msg_attrs.include?("attempts") ? msg_attrs["attempts"]["string_value"].to_i + 1 : 1
        will_fail_permanently = attempt_number > self.attempt_threshold
        
        # Run the job and increase the job count
        # Once successful the job count is decreased by one
        # and the message is deleted.
        # If an error occured the job count is decreased by 
        # one and the error is logged locally and with sentry
        self.job_count += 1
        message = JSON.parse(msg.body)
        process_message!(message, msg, will_fail_permanently).then {
            self.job_count -= 1
            
        }.rescue { |ex|
            begin
                if will_fail_permanently
                    self.app.logger.queue.error "Message failed #{attempt_number} times, Reporting and failing permanently. \n#{ex.to_s} \n#{ex.backtrace.join("\n")}"
                    Sentry.capture_exception(ex, {
                        :level => "fatal",
                        :extra => {
                            "queue_attempts" => attempt_number,
                            "queue_message_body" => msg.body
                        }
                    })

                else
                    self.app.logger.queue.error "Message failed #{attempt_number} times, Adding back to queue."
                    
                    self.poller.client.send_message({
                        :queue_url => self.poller.queue_url,
                        :message_body => msg.body,
                        :delay_seconds => self.visibility_timeout,
                        :message_attributes => msg_attrs.merge({
                            "attempts" => {
                                :string_value => attempt_number.to_s,
                                :data_type => "Number"
                            }
                        })
                    })
                end
                
                if self.app.config.is_dev?
                    self.app.logger.queue.error "Processor Error:"
                    self.app.logger.queue.error ex.message
                    self.app.logger.queue.error ex.backtrace
                end

            rescue Exception => inner_ex
                self.app.logger.queue.error inner_ex.message

            ensure
                self.job_count -= 1

            end
        }.execute
    else
        self.poller.change_message_visibility_timeout(msg, 5)
        throw :skip_delete

    end
end

#set_status(status) ⇒ Object

Set the processor status. The status is alos logged to file so services like capastranio can see the current status



85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/herdst_worker/queue/processor.rb', line 85

def set_status(status)
    statuses = ["starting", "idle", "working", "finishing", "stopping", "stopped"]
    
    if statuses.include? status
        # Set status
        self.processor_status = status
        
        # Write the current status to file for capastranio to use
        process_file = self.app.config.paths.temp + "/process_status"
        File.open(process_file, "w") { |file| file.write(status) }
    else
        raise "Invalid status (#{status})"
    end
end

#startObject

Starts or resets the application to a working status



61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/herdst_worker/queue/processor.rb', line 61

def start
    if self.processor_status == "starting"
        self.set_status "working"
        self.reset_time
        self.start_poller
    else
        return if self.processor_status == "working"
        
        self.set_status "working"
        self.reset_time
    end
end

#start_pollerObject

Runs the poller



47
48
49
50
51
52
53
54
55
56
57
# File 'lib/herdst_worker/queue/processor.rb', line 47

def start_poller
    if self.enabled
        self.poller.poll(:wait_time_seconds => self.queue_wait_time, :skip_delete => false) do |msg|
            process_message(msg)
        end
        
        self.app.deregister_task
    else
        raise "Cannot start a queue which is not enabled"
    end
end

#stopObject

Sets the processor status to stopping. The sqs before action will take care of stopping the application once all jobs have finished.



77
78
79
80
# File 'lib/herdst_worker/queue/processor.rb', line 77

def stop
    return if self.processor_status == "stopping"
    set_status "stopping"
end