Class: AsyncRequestReply::Worker
- Inherits:
-
Object
- Object
- AsyncRequestReply::Worker
- Defined in:
- lib/async_request_reply/worker.rb
Constant Summary collapse
- STATUS =
%i[waiting processing done unprocessable_entity internal_server_error]
- ONE_HOUR =
60*60
- LIVE_TIMEOUT =
TODO-2023-10-22: Isso limita o processamento de máximo 1 hora.
ONE_HOUR- @@config =
TODO-2023-10-22: Adicinar mais logs a classe.
AsyncRequestReply::Config.instance
Instance Attribute Summary collapse
-
#_ttl ⇒ Object
Returns the value of attribute _ttl.
-
#class_instance ⇒ Object
Returns the value of attribute class_instance.
-
#end_time ⇒ Object
readonly
Returns the value of attribute end_time.
-
#errors ⇒ Object
Returns the value of attribute errors.
-
#failure ⇒ Object
Returns the value of attribute failure.
-
#methods_chain ⇒ Object
Returns the value of attribute methods_chain.
-
#new_record ⇒ Object
readonly
Returns the value of attribute new_record.
-
#raise_error ⇒ Object
Returns the value of attribute raise_error.
-
#redirect_url ⇒ Object
Returns the value of attribute redirect_url.
-
#start_time ⇒ Object
readonly
Returns the value of attribute start_time.
-
#status ⇒ Object
Returns the value of attribute status.
-
#status_url ⇒ Object
Returns the value of attribute status_url.
-
#success ⇒ Object
Returns the value of attribute success.
-
#uuid ⇒ Object
Returns the value of attribute uuid.
Class Method Summary collapse
- ._find(p_uuid) ⇒ Object
- .find(p_uuid) ⇒ Object
-
.message_pack_factory ⇒ Object
TODO: Desacoplar message pack factory.
- .unpack(packer) ⇒ Object
Instance Method Summary collapse
- #async_engine ⇒ Object
- #attributes ⇒ Object
- #default_attributes ⇒ Object
-
#destroy(seconds_in = 0.seconds.to_i) ⇒ Object
Remove request from data store.
- #elapsed ⇒ Object
- #formated_erros_to_json(errors) ⇒ Object
- #id ⇒ Object
-
#initialize(attrs = {}) ⇒ Worker
constructor
A new instance of Worker.
- #new_record?(p_uuid) ⇒ Boolean
- #perform ⇒ Object
- #perform_async ⇒ Object
- #reload! ⇒ Object
- #save ⇒ Object
-
#to_msgpack ⇒ Object
Serializa a intância usando o MessagePack.
- #update(attrs) ⇒ Object
- #valid? ⇒ Boolean
- #with_async_engine(engine_class) ⇒ Object
Constructor Details
#initialize(attrs = {}) ⇒ Worker
Returns a new instance of Worker.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/async_request_reply/worker.rb', line 20 def initialize(attrs = {}) attrs.transform_keys(&:to_sym) @uuid = new_record?(attrs[:uuid]) ? "async_request:#{SecureRandom.uuid}" : attrs[:uuid] # INFO: Remover do repositório depois que async_request for processado # TODO-2023-10-22: Entender a relação entre número de objetos criados e # consulmo de mémoria no host onde está o redis. Definir uma estrátegia # que limite o tamanho máximo de uma instância da classe e controle do ciclo # de vida de cada instancia no banco pra ofecer melhor controle pra cada caso # de uso. destroy(30.seconds.to_i) if !new_record?(attrs[:uuid]) && attrs[:status].to_sym == :done # Assigners attributes assign_attributes(default_attributes.merge(attrs)) end |
Instance Attribute Details
#_ttl ⇒ Object
Returns the value of attribute _ttl.
15 16 17 |
# File 'lib/async_request_reply/worker.rb', line 15 def _ttl @_ttl end |
#class_instance ⇒ Object
Returns the value of attribute class_instance.
15 16 17 |
# File 'lib/async_request_reply/worker.rb', line 15 def class_instance @class_instance end |
#end_time ⇒ Object
Returns the value of attribute end_time.
18 19 20 |
# File 'lib/async_request_reply/worker.rb', line 18 def end_time @end_time end |
#errors ⇒ Object
Returns the value of attribute errors.
18 19 20 |
# File 'lib/async_request_reply/worker.rb', line 18 def errors @errors end |
#failure ⇒ Object
Returns the value of attribute failure.
15 16 17 |
# File 'lib/async_request_reply/worker.rb', line 15 def failure @failure end |
#methods_chain ⇒ Object
Returns the value of attribute methods_chain.
15 16 17 |
# File 'lib/async_request_reply/worker.rb', line 15 def methods_chain @methods_chain end |
#new_record ⇒ Object (readonly)
Returns the value of attribute new_record.
18 19 20 |
# File 'lib/async_request_reply/worker.rb', line 18 def new_record @new_record end |
#raise_error ⇒ Object
Returns the value of attribute raise_error.
15 16 17 |
# File 'lib/async_request_reply/worker.rb', line 15 def raise_error @raise_error end |
#redirect_url ⇒ Object
Returns the value of attribute redirect_url.
15 16 17 |
# File 'lib/async_request_reply/worker.rb', line 15 def redirect_url @redirect_url end |
#start_time ⇒ Object
Returns the value of attribute start_time.
18 19 20 |
# File 'lib/async_request_reply/worker.rb', line 18 def start_time @start_time end |
#status ⇒ Object
Returns the value of attribute status.
15 16 17 |
# File 'lib/async_request_reply/worker.rb', line 15 def status @status end |
#status_url ⇒ Object
Returns the value of attribute status_url.
15 16 17 |
# File 'lib/async_request_reply/worker.rb', line 15 def status_url @status_url end |
#success ⇒ Object
Returns the value of attribute success.
15 16 17 |
# File 'lib/async_request_reply/worker.rb', line 15 def success @success end |
#uuid ⇒ Object
Returns the value of attribute uuid.
15 16 17 |
# File 'lib/async_request_reply/worker.rb', line 15 def uuid @uuid end |
Class Method Details
._find(p_uuid) ⇒ Object
90 91 92 93 94 95 |
# File 'lib/async_request_reply/worker.rb', line 90 def self._find(p_uuid) resource = @@config.repository_adapter.get(p_uuid) return nil unless resource unpack(resource) end |
.find(p_uuid) ⇒ Object
83 84 85 86 87 88 |
# File 'lib/async_request_reply/worker.rb', line 83 def self.find(p_uuid) resource = _find(p_uuid) return nil if resource.empty? new(resource) end |
.message_pack_factory ⇒ Object
TODO: Desacoplar message pack factory
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/async_request_reply/worker.rb', line 156 def self. factory = MessagePack::Factory.new @@config..each do |fac| factory.register_type( fac[:first_byte], fac[:klass], packer: fac[:packer], unpacker: fac[:unpacker], recursive: true ) end factory end |
.unpack(packer) ⇒ Object
151 152 153 |
# File 'lib/async_request_reply/worker.rb', line 151 def self.unpack(packer) .load(packer) end |
Instance Method Details
#async_engine ⇒ Object
133 134 135 |
# File 'lib/async_request_reply/worker.rb', line 133 def async_engine handle_async_engine end |
#attributes ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/async_request_reply/worker.rb', line 43 def attributes { 'uuid' => uuid, 'status' => status, 'success' => success, 'failure' => failure, 'methods_chain' => methods_chain, 'class_instance' => class_instance, 'redirect_url' => redirect_url, 'start_time' => start_time, 'end_time' => end_time, 'raise_error' => raise_error } end |
#default_attributes ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/async_request_reply/worker.rb', line 57 def default_attributes { raise_error: true, methods_chain: [], 'status' => :waiting, success: { class_instance: 'self', methods_chain: [] }, failure: { class_instance: 'self', methods_chain: [] } } end |
#destroy(seconds_in = 0.seconds.to_i) ⇒ Object
Remove request from data store. Can pass as params integer value for how many seconds you want remove from data store
115 116 117 118 119 120 |
# File 'lib/async_request_reply/worker.rb', line 115 def destroy(seconds_in = 0.seconds.to_i) return @@config.repository_adapter.del(id) if seconds_in.zero? self._ttl = seconds_in save end |
#elapsed ⇒ Object
97 98 99 |
# File 'lib/async_request_reply/worker.rb', line 97 def elapsed (@end_time || Process.clock_gettime(Process::CLOCK_MONOTONIC)) - @start_time end |
#formated_erros_to_json(errors) ⇒ Object
231 232 233 234 235 236 237 238 239 |
# File 'lib/async_request_reply/worker.rb', line 231 def formated_erros_to_json(errors) resouce = if errors.respond_to?(:map) errors.map { |title, error| { title: title, detail: error } } else [{ title: errors }] end resouce.map { |error| error.select { |_k, v| !v.nil? && !(v.respond_to?(:empty?) && v.empty?) } } end |
#id ⇒ Object
79 80 81 |
# File 'lib/async_request_reply/worker.rb', line 79 def id uuid end |
#new_record?(p_uuid) ⇒ Boolean
73 74 75 76 77 |
# File 'lib/async_request_reply/worker.rb', line 73 def new_record?(p_uuid) return true if p_uuid.nil? @@config.repository_adapter.get(p_uuid).nil? end |
#perform ⇒ Object
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/async_request_reply/worker.rb', line 186 def perform begin @start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) @@config.logger.info("Start perform worker #{self.uuid}") raise "Can't update worker while it's performing" unless update(status: :processing) if element = MethodsChain.run_methods_chain(class_instance, methods_chain) @@config.logger.info("successful workflow perform worker #{self.uuid}") klass_after = success[:class_instance] == 'self' ? element : success[:class_instance] methods_after = success[:methods_chain] result = MethodsChain.run_methods_chain(klass_after, methods_after) @end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) raise "Can't update worker while it's performing" unless update(status: :done) @@config.logger.info("Done perform worker #{self.uuid}") result else @@config.logger.error("failure workflow perform worker #{self.uuid}") klass_reject_after = failure[:class_instance] == 'self' ? element : failure[:class_instance] methods_reject_after = failure[:methods_chain] result = MethodsChain.run_methods_chain(klass_reject_after,methods_reject_after) @end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) raise "Can't update worker while it's performing" unless update( status: :unprocessable_entity, errors: formated_erros_to_json(result)) @@config.logger.error("Done perform worker #{self.uuid} with fails #{formated_erros_to_json(result)}") result end rescue StandardError => e @@config.logger.fatal("Fatal perform worker #{self.uuid} with fails #{formated_erros_to_json(e.)}") @end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) update(status: :internal_server_error, errors: formated_erros_to_json(e.)) raise e if @raise_error nil end end |
#perform_async ⇒ Object
128 129 130 131 |
# File 'lib/async_request_reply/worker.rb', line 128 def perform_async save handle_async_engine.perform_async(id) end |
#reload! ⇒ Object
107 108 109 |
# File 'lib/async_request_reply/worker.rb', line 107 def reload! assign_attributes(self.class._find(self.uuid)) end |
#save ⇒ Object
122 123 124 125 126 |
# File 'lib/async_request_reply/worker.rb', line 122 def save return nil unless valid? attributes = self.class.unpack(@@config.repository_adapter.setex(uuid, (_ttl || LIVE_TIMEOUT), to_msgpack)) assign_attributes(attributes) end |
#to_msgpack ⇒ Object
Serializa a intância usando o MessagePack. Além de ser mais rápido e menor que JSON é uma boa opção para serializar arquivos. Ref.: msgpack.org/ Ref.: github.com/msgpack/msgpack-ruby#extension-types
147 148 149 |
# File 'lib/async_request_reply/worker.rb', line 147 def to_msgpack self.class..dump(attributes.as_json) end |
#update(attrs) ⇒ Object
102 103 104 105 |
# File 'lib/async_request_reply/worker.rb', line 102 def update(attrs) assign_attributes(attrs) save end |
#valid? ⇒ Boolean
36 37 38 39 40 41 |
# File 'lib/async_request_reply/worker.rb', line 36 def valid? @errors = [] @errors << "class_instance can't be blank." if class_instance.nil? @errors.empty? end |
#with_async_engine(engine_class) ⇒ Object
137 138 139 140 |
# File 'lib/async_request_reply/worker.rb', line 137 def with_async_engine(engine_class) @handle_async_engine = engine_class self end |