Class: ParallelWorkforce::Job::Util::Performer
- Inherits:
-
Object
- Object
- ParallelWorkforce::Job::Util::Performer
- Defined in:
- lib/parallel_workforce/job/util/performer.rb
Instance Attribute Summary collapse
-
#actor_class_name ⇒ Object
readonly
Returns the value of attribute actor_class_name.
-
#index ⇒ Object
readonly
Returns the value of attribute index.
-
#result_key ⇒ Object
readonly
Returns the value of attribute result_key.
-
#server_revision ⇒ Object
readonly
Returns the value of attribute server_revision.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(actor_class_name:, result_key:, index:, server_revision:, serialized_actor_args:) ⇒ Performer
constructor
A new instance of Performer.
-
#perform ⇒ Object
rubocop:disable Metrics/MethodLength rubocop:disable Metrics/AbcSize.
Constructor Details
#initialize(actor_class_name:, result_key:, index:, server_revision:, serialized_actor_args:) ⇒ Performer
Returns a new instance of Performer.
25 26 27 28 29 30 31 |
# File 'lib/parallel_workforce/job/util/performer.rb', line 25 def initialize(actor_class_name:, result_key:, index:, server_revision:, serialized_actor_args:) @actor_class_name = actor_class_name @result_key = result_key @index = index @server_revision = server_revision @serialized_actor_args = serialized_actor_args end |
Instance Attribute Details
#actor_class_name ⇒ Object (readonly)
Returns the value of attribute actor_class_name.
5 6 7 |
# File 'lib/parallel_workforce/job/util/performer.rb', line 5 def actor_class_name @actor_class_name end |
#index ⇒ Object (readonly)
Returns the value of attribute index.
5 6 7 |
# File 'lib/parallel_workforce/job/util/performer.rb', line 5 def index @index end |
#result_key ⇒ Object (readonly)
Returns the value of attribute result_key.
5 6 7 |
# File 'lib/parallel_workforce/job/util/performer.rb', line 5 def result_key @result_key end |
#server_revision ⇒ Object (readonly)
Returns the value of attribute server_revision.
5 6 7 |
# File 'lib/parallel_workforce/job/util/performer.rb', line 5 def server_revision @server_revision end |
Class Method Details
.parallel_workforce_thread? ⇒ Boolean
20 21 22 |
# File 'lib/parallel_workforce/job/util/performer.rb', line 20 def parallel_workforce_thread? !!Thread.current[:parallel_workforce_thread] end |
.perform(actor_class, actor_args) ⇒ Object
14 15 16 17 18 |
# File 'lib/parallel_workforce/job/util/performer.rb', line 14 def perform(actor_class, actor_args) actor_args = actor_args.each_with_object({}) { |(k, v), result| result[k.to_sym] = v } (actor_args.empty? ? actor_class.new : actor_class.new(**actor_args)).perform end |
Instance Method Details
#perform ⇒ Object
rubocop:disable Metrics/MethodLength rubocop:disable Metrics/AbcSize
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/parallel_workforce/job/util/performer.rb', line 35 def perform result = {} if server_revision == worker_revision begin result[:serialized_value] = serialize(perform_actor) rescue ParallelWorkforce::Error => e warn("#{self.class}: Actor revision error: #{e}") result[:error_revision] = worker_revision rescue => e warn("#{self.class}: Error in actor perform: #{e}", *e.backtrace) result[:error] = "Error in actor perform. #{e.class} #{e.}" end else warn("#{self.class}: Revision mismatch from caller") result[:error_revision] = worker_revision end result rescue Exception => exception result = handle_exception(exception) ensure ParallelWorkforce.configuration.redis_connector.with do |redis| # always publish a message result to avoid a Timeout in subscriber # NOTE: always using Ruby marshaling to store in Redis, not serializer redis.rpush(result_key, Marshal.dump(result.merge(index: index))) end end |