Class: Beaneater::Jobs

Inherits:
Object
  • Object
show all
Defined in:
lib/beaneater/job/collection.rb

Overview

Represents collection of job-related commands.

Constant Summary collapse

MAX_RETRIES =

Number of retries to process a job.

3
RELEASE_DELAY =

Delay in seconds before to make job ready again.

1
RESERVE_TIMEOUT =

Number of seconds to wait for a job before checking a different server.

nil

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client) ⇒ Jobs

Creates new jobs instance.

Examples:

Beaneater::Jobs.new(@client)

Parameters:

  • client (Beaneater)

    The beaneater client instance.



32
33
34
# File 'lib/beaneater/job/collection.rb', line 32

def initialize(client)
  @client = client
end

Instance Attribute Details

#clientBeaneater

Returns the client instance

Returns:

  • (Beaneater)

    returns the client instance



15
# File 'lib/beaneater/job/collection.rb', line 15

attr_reader :processors, :client, :current_job

#current_jobObject

Returns the value of attribute current_job.



15
# File 'lib/beaneater/job/collection.rb', line 15

attr_reader :processors, :client, :current_job

#processorsArray<Proc>

Returns Collection of proc to handle beanstalkd jobs

Returns:

  • (Array<Proc>)

    returns Collection of proc to handle beanstalkd jobs



15
16
17
# File 'lib/beaneater/job/collection.rb', line 15

def processors
  @processors
end

Instance Method Details

#find(id) ⇒ Beaneater::Job Also known as: peek, []

Peek (or find) job by id from beanstalkd.

Examples:

@beaneater.jobs[123] # => <Beaneater::Job>
@beaneater.jobs.find(123) # => <Beaneater::Job>
@beaneater.jobs.peek(123) # => <Beaneater::Job>

Parameters:

  • id (Integer)

    Job id to find

Returns:



53
54
55
56
57
58
# File 'lib/beaneater/job/collection.rb', line 53

def find(id)
  res = transmit("peek #{id}")
  Job.new(client, res)
rescue Beaneater::NotFoundError
  nil
end

#process!(options = {}) ⇒ Object

Watch, reserve, process and delete or bury or release jobs.

Parameters:

  • options (Hash{String => Integer}) (defaults to: {})

    Settings for processing

Options Hash (options):

  • release_delay (Integer)

    Delay in seconds before to make job ready again

  • reserve_timeout (Integer)

    Number of seconds to wait for a job before checking a different server



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/beaneater/job/collection.rb', line 106

def process!(options={})
  release_delay = options.delete(:release_delay) || RELEASE_DELAY
  reserve_timeout = options.delete(:reserve_timeout) || RESERVE_TIMEOUT
  client.tubes.watch!(*processors.keys)
  while !stop? do
    begin
      @current_job = client.tubes.reserve(reserve_timeout)
      processor = processors[@current_job.tube]
      begin
        processor[:block].call(@current_job)
        @current_job.delete
      rescue *processor[:retry_on]
        if @current_job.stats.releases < processor[:max_retries]
          @current_job.release(:delay => release_delay)
        end
      end
    rescue AbortProcessingError
      break
    rescue Beaneater::JobNotReserved, Beaneater::NotFoundError, Beaneater::TimedOutError
      retry
    rescue StandardError # handles unspecified errors
      @current_job.bury if @current_job
    ensure # bury if still reserved
      @current_job.bury if @current_job && @current_job.exists? && @current_job.reserved?
      @current_job = nil
    end
  end
end

#register(tube_name, options = {}, &block) ⇒ Object

Register a processor to handle beanstalkd job on particular tube.

Examples:

@beanstalk.jobs.register('some-tube', :retry_on => [SomeError]) do |job|
  do_something(job)
end

@beanstalk.jobs.register('other-tube') do |job|
  do_something_else(job)
end

Parameters:

  • tube_name (String)

    Tube name

  • options (Hash{String=>RuntimeError}) (defaults to: {})

    settings for processor

  • block (Proc)

    Process beanstalkd job

Options Hash (options):

  • max_retries (Integer)

    Number of retries to process a job

  • retry_on (Array<RuntimeError>)

    Collection of errors to rescue and re-run processor



80
81
82
83
84
85
# File 'lib/beaneater/job/collection.rb', line 80

def register(tube_name, options={}, &block)
  @processors ||= {}
  max_retries = options[:max_retries] || MAX_RETRIES
  retry_on = Array(options[:retry_on])
  @processors[tube_name.to_s] = { :block => block, :retry_on => retry_on, :max_retries => max_retries }
end

#stop!Object

Sets flag to indicate that process loop should stop after current job



88
89
90
# File 'lib/beaneater/job/collection.rb', line 88

def stop!
  @stop = true
end

#stop?Boolean

Returns whether the process loop should stop

Returns:

  • (Boolean)

    if true the loop should stop after current processing



95
96
97
# File 'lib/beaneater/job/collection.rb', line 95

def stop?
  !!@stop
end

#transmit(command, options = {}) ⇒ Object

Delegates transmit to the connection object.



39
40
41
# File 'lib/beaneater/job/collection.rb', line 39

def transmit(command, options={})
  client.connection.transmit(command, **options)
end