Module: RocketJob::Plugins::Job::Worker

Extended by:
ActiveSupport::Concern
Included in:
Job
Defined in:
lib/rocket_job/plugins/job/worker.rb

Defined Under Namespace

Modules: ClassMethods

Instance Method Summary collapse

Instance Method Details

#fail_on_exception!(re_raise_exceptions = false, &block) ⇒ Object

Fail this job in the event of an exception.

The job is automatically saved only if an exception is raised in the supplied block.

re_raise_exceptions: [true|false]

Re-raise the exception after updating the job
Default: false


69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/rocket_job/plugins/job/worker.rb', line 69

def fail_on_exception!(re_raise_exceptions = false, &block)
  SemanticLogger.named_tagged(job: id.to_s, &block)
rescue Exception => e
  SemanticLogger.named_tagged(job: id.to_s) do
    if failed? || !may_fail?
      self.exception        = JobException.from_exception(e)
      exception.worker_name = worker_name
      save! unless new_record? || destroyed?
    elsif new_record? || destroyed?
      fail(worker_name, e)
    else
      fail!(worker_name, e)
    end
    raise e if re_raise_exceptions
  end
end

#performObject

Raises:

  • (NotImplementedError)


58
59
60
# File 'lib/rocket_job/plugins/job/worker.rb', line 58

def perform(*)
  raise NotImplementedError
end

#perform_nowObject

Runs the job now in the current thread.

Validations are called prior to running the job.

The job is not saved and therefore the following callbacks are not called:

  • before_save

  • after_save

  • before_create

  • after_create

Exceptions are not suppressed and should be handled by the caller.

Raises:

  • (::Mongoid::Errors::Validations)


48
49
50
51
52
53
54
55
56
# File 'lib/rocket_job/plugins/job/worker.rb', line 48

def perform_now
  raise(::Mongoid::Errors::Validations, self) unless valid?

  worker = RocketJob::Worker.new(inline: true)
  start if may_start?
  # Re-Raise exceptions
  rocket_job_work(worker, true) if running?
  result
end

#rocket_job_active_workers(server_name = nil) ⇒ Object

Returns [Hash<String:>] All servers actively working on this job



123
124
125
126
127
# File 'lib/rocket_job/plugins/job/worker.rb', line 123

def rocket_job_active_workers(server_name = nil)
  return [] if !running? || (server_name && !worker_on_server?(server_name))

  [ActiveWorker.new(worker_name, started_at, self)]
end

#rocket_job_work(_worker, re_raise_exceptions = false) ⇒ Object

Works on this job

Returns [true|false] whether any work was performed.

If an exception is thrown the job is marked as failed and the exception is set in the job itself.

Thread-safe, can be called by multiple threads at the same time

Raises:

  • (ArgumentError)


94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/rocket_job/plugins/job/worker.rb', line 94

def rocket_job_work(_worker, re_raise_exceptions = false)
  raise(ArgumentError, "Job must be started before calling #rocket_job_work") unless running?

  fail_on_exception!(re_raise_exceptions) do
    if _perform_callbacks.empty?
      @rocket_job_output = perform
    else
      # Allows @rocket_job_output to be modified by after/around callbacks
      run_callbacks(:perform) do
        # Allow callbacks to fail, complete or abort the job
        @rocket_job_output = perform if running?
      end
    end

    if collect_output?
      # Result must be a Hash, if not put it in a Hash
      self.result = @rocket_job_output.is_a?(Hash) ? @rocket_job_output : {"result" => @rocket_job_output}
    end

    if new_record? || destroyed?
      complete if may_complete?
    else
      may_complete? ? complete! : save!
    end
  end
  false
end