Class: ParallelWorkforce::Job::Util::Performer

Inherits:
Object
  • Object
show all
Defined in:
lib/parallel_workforce/job/util/performer.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(actor_class_name:, result_key:, index:, server_revision:, serialized_actor_args:) ⇒ Performer

Returns a new instance of Performer.



25
26
27
28
29
30
31
# File 'lib/parallel_workforce/job/util/performer.rb', line 25

def initialize(actor_class_name:, result_key:, index:, server_revision:, serialized_actor_args:)
  @actor_class_name = actor_class_name
  @result_key = result_key
  @index = index
  @server_revision = server_revision
  @serialized_actor_args = serialized_actor_args
end

Instance Attribute Details

#actor_class_nameObject (readonly)

Returns the value of attribute actor_class_name.



5
6
7
# File 'lib/parallel_workforce/job/util/performer.rb', line 5

def actor_class_name
  @actor_class_name
end

#indexObject (readonly)

Returns the value of attribute index.



5
6
7
# File 'lib/parallel_workforce/job/util/performer.rb', line 5

def index
  @index
end

#result_keyObject (readonly)

Returns the value of attribute result_key.



5
6
7
# File 'lib/parallel_workforce/job/util/performer.rb', line 5

def result_key
  @result_key
end

#server_revisionObject (readonly)

Returns the value of attribute server_revision.



5
6
7
# File 'lib/parallel_workforce/job/util/performer.rb', line 5

def server_revision
  @server_revision
end

Class Method Details

.parallel_workforce_thread?Boolean

Returns:

  • (Boolean)


20
21
22
# File 'lib/parallel_workforce/job/util/performer.rb', line 20

def parallel_workforce_thread?
  !!Thread.current[:parallel_workforce_thread]
end

.perform(actor_class, actor_args) ⇒ Object



14
15
16
17
18
# File 'lib/parallel_workforce/job/util/performer.rb', line 14

def perform(actor_class, actor_args)
  actor_args = actor_args.each_with_object({}) { |(k, v), result| result[k.to_sym] = v }

  (actor_args.empty? ? actor_class.new : actor_class.new(**actor_args)).perform
end

Instance Method Details

#performObject

rubocop:disable Metrics/MethodLength rubocop:disable Metrics/AbcSize



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/parallel_workforce/job/util/performer.rb', line 35

def perform
  result = {}

  if server_revision == worker_revision
    begin
      result[:serialized_value] = serialize(perform_actor)
    rescue ParallelWorkforce::Error => e
      warn("#{self.class}: Actor revision error: #{e}")
      result[:error_revision] = worker_revision
    rescue => e
      warn("#{self.class}: Error in actor perform: #{e}", *e.backtrace)
      result[:error] = "Error in actor perform. #{e.class} #{e.message}"
    end
  else
    warn("#{self.class}: Revision mismatch from caller")
    result[:error_revision] = worker_revision
  end

  result
rescue Exception => exception
  result = handle_exception(exception)
ensure
  ParallelWorkforce.configuration.redis_connector.with do |redis|
    # always publish a message result to avoid a Timeout in subscriber
    # NOTE: always using Ruby marshaling to store in Redis, not serializer
    redis.rpush(result_key, Marshal.dump(result.merge(index: index)))
  end
end