Class: Opener::Daemons::Daemon
- Inherits:
-
Oni::Daemons::SQS
- Object
- Oni::Daemons::SQS
- Opener::Daemons::Daemon
- Defined in:
- lib/opener/daemons/daemon.rb
Overview
The Daemon class communicates with an AWS SQS queue and delegates work to the mapper and worker classes.
Instance Attribute Summary collapse
- #component ⇒ Class readonly
- #component_options ⇒ Hash readonly
Instance Method Summary collapse
-
#before_start ⇒ Object
Called before the daemon is started.
- #complete(message, output) ⇒ Object
-
#create_mapper ⇒ Object
Overwrites the original method so that we can inject the component into the mapper.
-
#error(error) ⇒ Object
Called when an error occurs.
-
#initialize(component, options = {}) ⇒ Daemon
constructor
A new instance of Daemon.
-
#report_exception(error) ⇒ Object
Sends an error to Rollbar.
Constructor Details
#initialize(component, options = {}) ⇒ Daemon
Returns a new instance of Daemon.
31 32 33 34 35 36 |
# File 'lib/opener/daemons/daemon.rb', line 31 def initialize(component, = {}) @component = component @component_options = super() # keep parenthesis, parent method doesn't take arguments. end |
Instance Attribute Details
#component ⇒ Class (readonly)
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/opener/daemons/daemon.rb', line 13 class Daemon < Oni::Daemons::SQS attr_reader :component, :component_options set :worker, Worker set :mapper, Mapper # The name of the SQS input queue to use. set :queue_name, proc { Daemons.input_queue } # The amount of workers to use. set :workers, ENV['DAEMON_WORKERS']&.to_i || 1 # The amount of threads to use. set :threads, proc { Daemons.daemon_threads } ## # @param [Class] component The component to run in the worker. # @param [Hash] options Extra options to pass to the component. # def initialize(component, = {}) @component = component @component_options = super() # keep parenthesis, parent method doesn't take arguments. end ## # Called before the daemon is started. # def before_start Core::Syslog.open( ENV['APP_NAME'], ::Syslog::LOG_CONS | ::Syslog::LOG_PID ) Core::Syslog.info( 'Starting daemon', :queue => option(:queue_name), :threads => threads ) GC::Profiler.enable Daemons. NewRelic::Agent.manual_start if Daemons.newrelic? Aws.eager_autoload!(:services => %w{S3 SQS}) end ## # Overwrites the original method so that we can inject the component into # the mapper. # # @see [Oni::Daemon#create_mapper] # def create_mapper unless option(:mapper) raise ArgumentError, 'No mapper has been set in the `:mapper` option' end return option(:mapper).new(component, ) end ## # Called when an error occurs. # # @param [StandardError] error # def error(error) report_exception(error) end ## # @param [AWS::SQS::ReceivedMessage] message # @param [Mixed] output # @param [Benchmark::Tms] timings # def complete(, output) log_msg = "Finished message #{.}" Core::Syslog.info(log_msg) ensure Transaction.reset_current end ## # Sends an error to Rollbar. # # @param [StandardError] error # def report_exception(error) if Daemons. Rollbar.error( error, :active_threads => Thread.list.count, :ruby_description => RUBY_DESCRIPTION, :parameters => Transaction.current.parameters ) else raise error end ensure Transaction.reset_current end end |
#component_options ⇒ Hash (readonly)
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/opener/daemons/daemon.rb', line 13 class Daemon < Oni::Daemons::SQS attr_reader :component, :component_options set :worker, Worker set :mapper, Mapper # The name of the SQS input queue to use. set :queue_name, proc { Daemons.input_queue } # The amount of workers to use. set :workers, ENV['DAEMON_WORKERS']&.to_i || 1 # The amount of threads to use. set :threads, proc { Daemons.daemon_threads } ## # @param [Class] component The component to run in the worker. # @param [Hash] options Extra options to pass to the component. # def initialize(component, = {}) @component = component @component_options = super() # keep parenthesis, parent method doesn't take arguments. end ## # Called before the daemon is started. # def before_start Core::Syslog.open( ENV['APP_NAME'], ::Syslog::LOG_CONS | ::Syslog::LOG_PID ) Core::Syslog.info( 'Starting daemon', :queue => option(:queue_name), :threads => threads ) GC::Profiler.enable Daemons. NewRelic::Agent.manual_start if Daemons.newrelic? Aws.eager_autoload!(:services => %w{S3 SQS}) end ## # Overwrites the original method so that we can inject the component into # the mapper. # # @see [Oni::Daemon#create_mapper] # def create_mapper unless option(:mapper) raise ArgumentError, 'No mapper has been set in the `:mapper` option' end return option(:mapper).new(component, ) end ## # Called when an error occurs. # # @param [StandardError] error # def error(error) report_exception(error) end ## # @param [AWS::SQS::ReceivedMessage] message # @param [Mixed] output # @param [Benchmark::Tms] timings # def complete(, output) log_msg = "Finished message #{.}" Core::Syslog.info(log_msg) ensure Transaction.reset_current end ## # Sends an error to Rollbar. # # @param [StandardError] error # def report_exception(error) if Daemons. Rollbar.error( error, :active_threads => Thread.list.count, :ruby_description => RUBY_DESCRIPTION, :parameters => Transaction.current.parameters ) else raise error end ensure Transaction.reset_current end end |
Instance Method Details
#before_start ⇒ Object
Called before the daemon is started.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/opener/daemons/daemon.rb', line 41 def before_start Core::Syslog.open( ENV['APP_NAME'], ::Syslog::LOG_CONS | ::Syslog::LOG_PID ) Core::Syslog.info( 'Starting daemon', :queue => option(:queue_name), :threads => threads ) GC::Profiler.enable Daemons. NewRelic::Agent.manual_start if Daemons.newrelic? Aws.eager_autoload!(:services => %w{S3 SQS}) end |
#complete(message, output) ⇒ Object
90 91 92 93 94 95 96 97 |
# File 'lib/opener/daemons/daemon.rb', line 90 def complete(, output) log_msg = "Finished message #{.}" Core::Syslog.info(log_msg) ensure Transaction.reset_current end |
#create_mapper ⇒ Object
Overwrites the original method so that we can inject the component into the mapper.
68 69 70 71 72 73 74 |
# File 'lib/opener/daemons/daemon.rb', line 68 def create_mapper unless option(:mapper) raise ArgumentError, 'No mapper has been set in the `:mapper` option' end return option(:mapper).new(component, ) end |
#error(error) ⇒ Object
Called when an error occurs.
81 82 83 |
# File 'lib/opener/daemons/daemon.rb', line 81 def error(error) report_exception(error) end |
#report_exception(error) ⇒ Object
Sends an error to Rollbar.
104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/opener/daemons/daemon.rb', line 104 def report_exception(error) if Daemons. Rollbar.error( error, :active_threads => Thread.list.count, :ruby_description => RUBY_DESCRIPTION, :parameters => Transaction.current.parameters ) else raise error end ensure Transaction.reset_current end |