Class: HerdstWorker::Queue::Processor
- Defined in:
- lib/herdst_worker/queue/processor.rb
Instance Attribute Summary collapse
-
#app ⇒ Object
Returns the value of attribute app.
-
#attempt_threshold ⇒ Object
Returns the value of attribute attempt_threshold.
-
#enabled ⇒ Object
Returns the value of attribute enabled.
-
#expire_at ⇒ Object
Returns the value of attribute expire_at.
-
#ignored_notifications ⇒ Object
Returns the value of attribute ignored_notifications.
-
#job_count ⇒ Object
Returns the value of attribute job_count.
-
#max_jobs ⇒ Object
Returns the value of attribute max_jobs.
-
#poller ⇒ Object
Returns the value of attribute poller.
-
#processor_expired ⇒ Object
Returns the value of attribute processor_expired.
-
#processor_status ⇒ Object
Returns the value of attribute processor_status.
-
#queue_url ⇒ Object
Returns the value of attribute queue_url.
-
#queue_wait_time ⇒ Object
Returns the value of attribute queue_wait_time.
-
#run_duration ⇒ Object
Returns the value of attribute run_duration.
-
#start_time ⇒ Object
Returns the value of attribute start_time.
-
#visibility_timeout ⇒ Object
Returns the value of attribute visibility_timeout.
Instance Method Summary collapse
- #before_request(stats) ⇒ Object
-
#initialize(app, enabled, runtime, queue_url, queue_wait_time) ⇒ Processor
constructor
A new instance of Processor.
- #process_message(msg) ⇒ Object
-
#set_status(status) ⇒ Object
Set the processor status.
-
#start ⇒ Object
Starts or resets the application to a working status.
-
#start_poller ⇒ Object
Runs the poller.
-
#stop ⇒ Object
Sets the processor status to stopping.
Methods inherited from Runner
#execute_message!, #process_message!
Constructor Details
#initialize(app, enabled, runtime, queue_url, queue_wait_time) ⇒ Processor
Returns a new instance of Processor.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/herdst_worker/queue/processor.rb', line 17 def initialize(app, enabled, runtime, queue_url, queue_wait_time) self.app = app self.enabled = enabled self.queue_url = queue_url self.queue_wait_time = queue_wait_time self.poller = Aws::SQS::QueuePoller.new(queue_url) self.job_count = 0 self.max_jobs = 10 self.attempt_threshold = 6 self.visibility_timeout = 15 self.ignored_notifications = [ "AmazonSnsSubscriptionSucceeded" ] self.processor_status = "starting" self.processor_expired = false self.run_duration = runtime self.reset_time # Start the processor as working self.set_status "starting" # Log queue stats self.poller.before_request do |stats| before_request(stats) end end |
Instance Attribute Details
#app ⇒ Object
Returns the value of attribute app.
11 12 13 |
# File 'lib/herdst_worker/queue/processor.rb', line 11 def app @app end |
#attempt_threshold ⇒ Object
Returns the value of attribute attempt_threshold.
14 15 16 |
# File 'lib/herdst_worker/queue/processor.rb', line 14 def attempt_threshold @attempt_threshold end |
#enabled ⇒ Object
Returns the value of attribute enabled.
11 12 13 |
# File 'lib/herdst_worker/queue/processor.rb', line 11 def enabled @enabled end |
#expire_at ⇒ Object
Returns the value of attribute expire_at.
12 13 14 |
# File 'lib/herdst_worker/queue/processor.rb', line 12 def expire_at @expire_at end |
#ignored_notifications ⇒ Object
Returns the value of attribute ignored_notifications.
14 15 16 |
# File 'lib/herdst_worker/queue/processor.rb', line 14 def ignored_notifications @ignored_notifications end |
#job_count ⇒ Object
Returns the value of attribute job_count.
13 14 15 |
# File 'lib/herdst_worker/queue/processor.rb', line 13 def job_count @job_count end |
#max_jobs ⇒ Object
Returns the value of attribute max_jobs.
13 14 15 |
# File 'lib/herdst_worker/queue/processor.rb', line 13 def max_jobs @max_jobs end |
#poller ⇒ Object
Returns the value of attribute poller.
11 12 13 |
# File 'lib/herdst_worker/queue/processor.rb', line 11 def poller @poller end |
#processor_expired ⇒ Object
Returns the value of attribute processor_expired.
13 14 15 |
# File 'lib/herdst_worker/queue/processor.rb', line 13 def processor_expired @processor_expired end |
#processor_status ⇒ Object
Returns the value of attribute processor_status.
13 14 15 |
# File 'lib/herdst_worker/queue/processor.rb', line 13 def processor_status @processor_status end |
#queue_url ⇒ Object
Returns the value of attribute queue_url.
11 12 13 |
# File 'lib/herdst_worker/queue/processor.rb', line 11 def queue_url @queue_url end |
#queue_wait_time ⇒ Object
Returns the value of attribute queue_wait_time.
11 12 13 |
# File 'lib/herdst_worker/queue/processor.rb', line 11 def queue_wait_time @queue_wait_time end |
#run_duration ⇒ Object
Returns the value of attribute run_duration.
12 13 14 |
# File 'lib/herdst_worker/queue/processor.rb', line 12 def run_duration @run_duration end |
#start_time ⇒ Object
Returns the value of attribute start_time.
12 13 14 |
# File 'lib/herdst_worker/queue/processor.rb', line 12 def start_time @start_time end |
#visibility_timeout ⇒ Object
Returns the value of attribute visibility_timeout.
14 15 16 |
# File 'lib/herdst_worker/queue/processor.rb', line 14 def visibility_timeout @visibility_timeout end |
Instance Method Details
#before_request(stats) ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/herdst_worker/queue/processor.rb', line 101 def before_request(stats) if self.app.config.is_dev? self.app.logger.queue_stats.info "STATS (#{self.processor_status}): #{stats.inspect}" end if self.poller.client.config.credentials.has_expired? self.poller.client.config.credentials = self.app.config..get_aws_credentials end # After hours of running terminate application. # The app will automatically restart in production current_time = Time.now.utc.to_i if (self.processor_status == "working") && (current_time >= self.expire_at) && (self.processor_expired == false) self.processor_expired = true self.app.force_stop(900) self.app.register_new_task end # On stopping wait for jobs to complete and then set status # to stopped. Once stopped the polling will terminate. if self.processor_status == "stopping" if self.job_count == 0 self.app.logger.queue.warn "Setting processor status to stopped" set_status "stopped" end end # Once stopped exit the application if self.processor_status == "stopped" self.app.logger.queue.warn "Stopping polling, Service requested to stop" throw :stop_polling end end |
#process_message(msg) ⇒ Object
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/herdst_worker/queue/processor.rb', line 138 def (msg) if self.processor_status == "working" # If the app is already processing the max number of jobs # put the message back in the queue with a short wait time if self.job_count >= self.max_jobs self.poller.(msg, self.visibility_timeout) throw :skip_delete end # Find out how many attempts there has been already for # the message. msg_attrs = msg..dup attempt_number = msg_attrs.include?("attempts") ? msg_attrs["attempts"]["string_value"].to_i + 1 : 1 will_fail_permanently = attempt_number > self.attempt_threshold # Run the job and increase the job count # Once successful the job count is decreased by one # and the message is deleted. # If an error occured the job count is decreased by # one and the error is logged locally and with sentry self.job_count += 1 = JSON.parse(msg.body) (, msg, will_fail_permanently).then { self.job_count -= 1 }.rescue { |ex| begin if will_fail_permanently self.app.logger.queue.error "Message failed #{attempt_number} times, Reporting and failing permanently. \n#{ex.to_s} \n#{ex.backtrace.join("\n")}" Sentry.capture_exception(ex, { :level => "fatal", :extra => { "queue_attempts" => attempt_number, "queue_message_body" => msg.body } }) else self.app.logger.queue.error "Message failed #{attempt_number} times, Adding back to queue." self.poller.client.({ :queue_url => self.poller.queue_url, :message_body => msg.body, :delay_seconds => self.visibility_timeout, :message_attributes => msg_attrs.merge({ "attempts" => { :string_value => attempt_number.to_s, :data_type => "Number" } }) }) end if self.app.config.is_dev? self.app.logger.queue.error "Processor Error:" self.app.logger.queue.error ex. self.app.logger.queue.error ex.backtrace end rescue Exception => inner_ex self.app.logger.queue.error inner_ex. ensure self.job_count -= 1 end }.execute else self.poller.(msg, 5) throw :skip_delete end end |
#set_status(status) ⇒ Object
Set the processor status. The status is alos logged to file so services like capastranio can see the current status
85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/herdst_worker/queue/processor.rb', line 85 def set_status(status) statuses = ["starting", "idle", "working", "finishing", "stopping", "stopped"] if statuses.include? status # Set status self.processor_status = status # Write the current status to file for capastranio to use process_file = self.app.config.paths.temp + "/process_status" File.open(process_file, "w") { |file| file.write(status) } else raise "Invalid status (#{status})" end end |
#start ⇒ Object
Starts or resets the application to a working status
61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/herdst_worker/queue/processor.rb', line 61 def start if self.processor_status == "starting" self.set_status "working" self.reset_time self.start_poller else return if self.processor_status == "working" self.set_status "working" self.reset_time end end |
#start_poller ⇒ Object
Runs the poller
47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/herdst_worker/queue/processor.rb', line 47 def start_poller if self.enabled self.poller.poll(:wait_time_seconds => self.queue_wait_time, :skip_delete => false) do |msg| (msg) end self.app.deregister_task else raise "Cannot start a queue which is not enabled" end end |
#stop ⇒ Object
Sets the processor status to stopping. The sqs before action will take care of stopping the application once all jobs have finished.
77 78 79 80 |
# File 'lib/herdst_worker/queue/processor.rb', line 77 def stop return if self.processor_status == "stopping" set_status "stopping" end |