Class: Beehive::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/beehive/worker.rb

Overview

The Worker class is used to retrieve and process jobs (in the background). Whenever a job is received the worker will fork itself and execute the job in that process. This is useful because any errors won’t crash the worker itself plus it will reduce the memory usage as once Ruby allocates memory to a process it’s never released unless that process exits.

Author:

  • Yorick Peterse

Since:

  • 0.1

Constant Summary collapse

Options =

Hash containing the default worker options.

Author:

  • Yorick Peterse

Since:

  • 0.1

{
  :logger    => ::Logger.new(STDOUT),
  :daemonize => false,
  :jobs      => [],
  :wait      => 5,
  :log_level => Logger::WARN,
  :pid       => File.join(Dir.pwd, 'worker.pid')
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis_options = {}, worker_options = {}) ⇒ Worker

Creates a new instance of the class, sets all the options and connects to the Redis database.

Redis#new for more information. with Logger of the standard library. reduces CPU and network usage. Logger::WARN by default.

Examples:

worker = Beehive::Worker.new({}, {:jobs => ['video'], :wait => 2})

Parameters:

  • redis_options (Hash) (defaults to: {})

    Hash containing all the options to use for Redis. See

  • worker_options (Hash) (defaults to: {})

    Hash containing worker specific options.

Options Hash (worker_options):

  • :logger (Object)

    The logger that should be used, has to be compatible

  • :daemonize (Object)

    Tells the worker to run in the background.

  • :jobs (Object)

    An array of jobs the current worker has to process.

  • :wait (Object)

    The amount of seconds to wait after each iteration,

  • :log_level (Object)

    The log even to use for the :logger option, set to

  • :pid (Object)

    Path to the location of the PID file for the worker.

Author:

  • Yorick Peterse

Since:

  • 0.1



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/beehive/worker.rb', line 56

def initialize(redis_options = {}, worker_options = {})
  @connection             = ::Beehive::Client.new(redis_options)
  @options                = Options.merge(worker_options)
  @options[:logger].level = @options[:log_level]
  @shutdown               = false

  # Check if the given jobs are valid
  @options[:jobs].each do |job|
    if !::Beehive::Jobs.key?(job)
      raise("The job \"#{job}\" is invalid as it could not be found in Beehive::Jobs")
    end
  end

  trap_signals
end

Instance Attribute Details

#connectionObject (readonly)

Instance of Beehive::Client, used for communicating with the Redis server

Since:

  • 0.1



29
30
31
# File 'lib/beehive/worker.rb', line 29

def connection
  @connection
end

#optionsObject

Hash containing all the custom configuration options

Since:

  • 0.1



32
33
34
# File 'lib/beehive/worker.rb', line 32

def options
  @options
end

Instance Method Details

#workObject

Waits for available jobs and execute a job whenever one is available.

Examples:

worker = Beehive::Worker.new
worker.work

Author:

  • Yorick Peterse

Since:

  • 0.1



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/beehive/worker.rb', line 82

def work
  # Daemonize the process?
  if @options[:daemonize] === true
    Process.daemon(true)
  end

  @worker_pid = Process.pid

  @options[:logger].info("Starting main worker, PID: #{@worker_pid}")
  write_pid

  loop do
    if @shutdown === true
      @options[:logger].info('The party has ended, time to shut down')
      @connection.disconnect
      File.unlink(@options[:pid])
      exit
    end

    # Reset the child PID
    @child_pid = nil

    # Check if there are any jobs available
    @options[:jobs].each do |job|
      params = @connection.get(job)

      if params
        @options[:logger].info(
          "Received the job \"#{job}\" with the following data: #{params.inspect}"
        )

        # Fork the process and run the job
        @child_pid = Process.fork do
          @options[:logger].info('Process forked, executing job')

          begin
            ::Beehive::Jobs[job].call(params, @options[:logger])

            @options[:logger].info('Job successfully processed')
            exit
          rescue => e
            @options[:logger].error("Failed to execute the job: #{e.inspect}")
          end
        end

        # Wait for the job to finish. This prevents this worker from spawning a worker
        # for each job it has to process (which could result in hundreds of processes
        # being spawned.
        Process.waitpid(@child_pid)
      end
    end

    # Did the PID change for some reason?
    if Process.pid != @worker_pid
      @worker_pid = Process.pid
      write_pid
    end

    # Reduces CPU load and network traffic
    sleep(@options[:wait])
  end
end