Class: Opener::Daemons::Daemon

Inherits:
Oni::Daemons::SQS
  • Object
show all
Defined in:
lib/opener/daemons/daemon.rb

Overview

The Daemon class communicates with an AWS SQS queue and delegates work to the mapper and worker classes.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(component, options = {}) ⇒ Daemon

Returns a new instance of Daemon.

Parameters:

  • component (Class)

    The component to run in the worker.

  • options (Hash) (defaults to: {})

    Extra options to pass to the component.



31
32
33
34
35
36
# File 'lib/opener/daemons/daemon.rb', line 31

def initialize(component, options = {})
  @component         = component
  @component_options = options

  super() # keep parenthesis, parent method doesn't take arguments.
end

Instance Attribute Details

#componentClass (readonly)

Returns:

  • (Class)


13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/opener/daemons/daemon.rb', line 13

class Daemon < Oni::Daemons::SQS
  attr_reader :component, :component_options

  set :worker, Worker
  set :mapper, Mapper

  # The name of the SQS input queue to use.
  set :queue_name, proc { Daemons.input_queue }

  # The amount of workers to use.
  set :workers, ENV['DAEMON_WORKERS']&.to_i || 1
  # The amount of threads to use.
  set :threads, proc { Daemons.daemon_threads }

  ##
  # @param [Class] component The component to run in the worker.
  # @param [Hash] options Extra options to pass to the component.
  #
  def initialize(component, options = {})
    @component         = component
    @component_options = options

    super() # keep parenthesis, parent method doesn't take arguments.
  end

  ##
  # Called before the daemon is started.
  #
  def before_start
    Core::Syslog.open(
      ENV['APP_NAME'],
      ::Syslog::LOG_CONS | ::Syslog::LOG_PID
    )

    Core::Syslog.info(
      'Starting daemon',
      :queue   => option(:queue_name),
      :threads => threads
    )

    GC::Profiler.enable

    Daemons.configure_rollbar

    NewRelic::Agent.manual_start if Daemons.newrelic?

    Aws.eager_autoload!(:services => %w{S3 SQS})
  end

  ##
  # Overwrites the original method so that we can inject the component into
  # the mapper.
  #
  # @see [Oni::Daemon#create_mapper]
  #
  def create_mapper
    unless option(:mapper)
      raise ArgumentError, 'No mapper has been set in the `:mapper` option'
    end

    return option(:mapper).new(component, component_options)
  end

  ##
  # Called when an error occurs.
  #
  # @param [StandardError] error
  #
  def error(error)
    report_exception(error)
  end

  ##
  # @param [AWS::SQS::ReceivedMessage] message
  # @param [Mixed] output
  # @param [Benchmark::Tms] timings
  #
  def complete(message, output)
    log_msg = "Finished message #{message.message_id}"

    Core::Syslog.info(log_msg)

  ensure
    Transaction.reset_current
  end

  ##
  # Sends an error to Rollbar.
  #
  # @param [StandardError] error
  #
  def report_exception(error)
    if Daemons.rollbar?
      Rollbar.error(
        error,
        :active_threads   => Thread.list.count,
        :ruby_description => RUBY_DESCRIPTION,
        :parameters       => Transaction.current.parameters
      )
    else
      raise error
    end
  ensure
    Transaction.reset_current
  end
end

#component_optionsHash (readonly)

Returns:

  • (Hash)


13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/opener/daemons/daemon.rb', line 13

class Daemon < Oni::Daemons::SQS
  attr_reader :component, :component_options

  set :worker, Worker
  set :mapper, Mapper

  # The name of the SQS input queue to use.
  set :queue_name, proc { Daemons.input_queue }

  # The amount of workers to use.
  set :workers, ENV['DAEMON_WORKERS']&.to_i || 1
  # The amount of threads to use.
  set :threads, proc { Daemons.daemon_threads }

  ##
  # @param [Class] component The component to run in the worker.
  # @param [Hash] options Extra options to pass to the component.
  #
  def initialize(component, options = {})
    @component         = component
    @component_options = options

    super() # keep parenthesis, parent method doesn't take arguments.
  end

  ##
  # Called before the daemon is started.
  #
  def before_start
    Core::Syslog.open(
      ENV['APP_NAME'],
      ::Syslog::LOG_CONS | ::Syslog::LOG_PID
    )

    Core::Syslog.info(
      'Starting daemon',
      :queue   => option(:queue_name),
      :threads => threads
    )

    GC::Profiler.enable

    Daemons.configure_rollbar

    NewRelic::Agent.manual_start if Daemons.newrelic?

    Aws.eager_autoload!(:services => %w{S3 SQS})
  end

  ##
  # Overwrites the original method so that we can inject the component into
  # the mapper.
  #
  # @see [Oni::Daemon#create_mapper]
  #
  def create_mapper
    unless option(:mapper)
      raise ArgumentError, 'No mapper has been set in the `:mapper` option'
    end

    return option(:mapper).new(component, component_options)
  end

  ##
  # Called when an error occurs.
  #
  # @param [StandardError] error
  #
  def error(error)
    report_exception(error)
  end

  ##
  # @param [AWS::SQS::ReceivedMessage] message
  # @param [Mixed] output
  # @param [Benchmark::Tms] timings
  #
  def complete(message, output)
    log_msg = "Finished message #{message.message_id}"

    Core::Syslog.info(log_msg)

  ensure
    Transaction.reset_current
  end

  ##
  # Sends an error to Rollbar.
  #
  # @param [StandardError] error
  #
  def report_exception(error)
    if Daemons.rollbar?
      Rollbar.error(
        error,
        :active_threads   => Thread.list.count,
        :ruby_description => RUBY_DESCRIPTION,
        :parameters       => Transaction.current.parameters
      )
    else
      raise error
    end
  ensure
    Transaction.reset_current
  end
end

Instance Method Details

#before_startObject

Called before the daemon is started.



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/opener/daemons/daemon.rb', line 41

def before_start
  Core::Syslog.open(
    ENV['APP_NAME'],
    ::Syslog::LOG_CONS | ::Syslog::LOG_PID
  )

  Core::Syslog.info(
    'Starting daemon',
    :queue   => option(:queue_name),
    :threads => threads
  )

  GC::Profiler.enable

  Daemons.configure_rollbar

  NewRelic::Agent.manual_start if Daemons.newrelic?

  Aws.eager_autoload!(:services => %w{S3 SQS})
end

#complete(message, output) ⇒ Object

Parameters:

  • message (AWS::SQS::ReceivedMessage)
  • output (Mixed)
  • timings (Benchmark::Tms)


90
91
92
93
94
95
96
97
# File 'lib/opener/daemons/daemon.rb', line 90

def complete(message, output)
  log_msg = "Finished message #{message.message_id}"

  Core::Syslog.info(log_msg)

ensure
  Transaction.reset_current
end

#create_mapperObject

Overwrites the original method so that we can inject the component into the mapper.

See Also:

  • Opener::Daemons::Daemon.[Oni[Oni::Daemon[Oni::Daemon#create_mapper]


68
69
70
71
72
73
74
# File 'lib/opener/daemons/daemon.rb', line 68

def create_mapper
  unless option(:mapper)
    raise ArgumentError, 'No mapper has been set in the `:mapper` option'
  end

  return option(:mapper).new(component, component_options)
end

#error(error) ⇒ Object

Called when an error occurs.

Parameters:

  • error (StandardError)


81
82
83
# File 'lib/opener/daemons/daemon.rb', line 81

def error(error)
  report_exception(error)
end

#report_exception(error) ⇒ Object

Sends an error to Rollbar.

Parameters:

  • error (StandardError)


104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/opener/daemons/daemon.rb', line 104

def report_exception(error)
  if Daemons.rollbar?
    Rollbar.error(
      error,
      :active_threads   => Thread.list.count,
      :ruby_description => RUBY_DESCRIPTION,
      :parameters       => Transaction.current.parameters
    )
  else
    raise error
  end
ensure
  Transaction.reset_current
end