Class: AsyncRequestReply::Worker
- Inherits:
-
Object
- Object
- AsyncRequestReply::Worker
- Defined in:
- lib/async_request_reply/worker.rb
Constant Summary collapse
- STATUS =
i[waiting processing done unprocessable_entity internal_server_error]
- ONE_HOUR =
60*60
- LIVE_TIMEOUT =
TODO-2023-10-22: Isso limita o processamento de máximo 1 hora.
ONE_HOUR- @@config =
TODO-2023-10-22: Adicinar mais logs a classe.
AsyncRequestReply::Config.instance
Instance Attribute Summary collapse
-
#_ttl ⇒ Object
Returns the value of attribute _ttl.
-
#class_instance ⇒ Object
Returns the value of attribute class_instance.
-
#end_time ⇒ Object
readonly
Returns the value of attribute end_time.
-
#errors ⇒ Object
Returns the value of attribute errors.
-
#failure ⇒ Object
Returns the value of attribute failure.
-
#methods_chain ⇒ Object
Returns the value of attribute methods_chain.
-
#new_record ⇒ Object
readonly
Returns the value of attribute new_record.
-
#redirect_url ⇒ Object
Returns the value of attribute redirect_url.
-
#start_time ⇒ Object
readonly
Returns the value of attribute start_time.
-
#status ⇒ Object
Returns the value of attribute status.
-
#status_url ⇒ Object
Returns the value of attribute status_url.
-
#success ⇒ Object
Returns the value of attribute success.
-
#uuid ⇒ Object
Returns the value of attribute uuid.
Class Method Summary collapse
- ._find(p_uuid) ⇒ Object
- .find(p_uuid) ⇒ Object
-
.message_pack_factory ⇒ Object
TODO: Desacoplar message pack factory.
- .unpack(packer) ⇒ Object
Instance Method Summary collapse
- #async_engine ⇒ Object
- #attributes ⇒ Object
- #default_attributes ⇒ Object
-
#destroy(seconds_in = 0.seconds.to_i) ⇒ Object
Remove request from data store.
- #elapsed ⇒ Object
- #formated_erros_to_json(errors) ⇒ Object
- #id ⇒ Object
-
#initialize(attrs = {}) ⇒ Worker
constructor
A new instance of Worker.
- #new_record?(p_uuid) ⇒ Boolean
- #perform ⇒ Object
- #perform_async ⇒ Object
- #reload! ⇒ Object
- #save ⇒ Object
-
#to_msgpack ⇒ Object
Serializa a intância usando o MessagePack.
- #update(attrs) ⇒ Object
- #valid? ⇒ Boolean
- #with_async_engine(engine_class) ⇒ Object
Constructor Details
#initialize(attrs = {}) ⇒ Worker
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/async_request_reply/worker.rb', line 20 def initialize(attrs = {}) attrs.transform_keys(&:to_sym) @uuid = new_record?(attrs[:uuid]) ? "async_request:#{SecureRandom.uuid}" : attrs[:uuid] # INFO: Remover do repositório depois que async_request for processado # TODO-2023-10-22: Entender a relação entre número de objetos criados e # consulmo de mémoria no host onde está o redis. Definir uma estrátegia # que limite o tamanho máximo de uma instância da classe e controle do ciclo # de vida de cada instancia no banco pra ofecer melhor controle pra cada caso # de uso. destroy(30.seconds.to_i) if !new_record?(attrs[:uuid]) && attrs[:status].to_sym == :done # Assigners attributes assign_attributes(default_attributes.merge(attrs)) end |
Instance Attribute Details
#_ttl ⇒ Object
Returns the value of attribute _ttl.
15 16 17 |
# File 'lib/async_request_reply/worker.rb', line 15 def _ttl @_ttl end |
#class_instance ⇒ Object
Returns the value of attribute class_instance.
15 16 17 |
# File 'lib/async_request_reply/worker.rb', line 15 def class_instance @class_instance end |
#end_time ⇒ Object
Returns the value of attribute end_time.
18 19 20 |
# File 'lib/async_request_reply/worker.rb', line 18 def end_time @end_time end |
#errors ⇒ Object
Returns the value of attribute errors.
18 19 20 |
# File 'lib/async_request_reply/worker.rb', line 18 def errors @errors end |
#failure ⇒ Object
Returns the value of attribute failure.
15 16 17 |
# File 'lib/async_request_reply/worker.rb', line 15 def failure @failure end |
#methods_chain ⇒ Object
Returns the value of attribute methods_chain.
15 16 17 |
# File 'lib/async_request_reply/worker.rb', line 15 def methods_chain @methods_chain end |
#new_record ⇒ Object (readonly)
Returns the value of attribute new_record.
18 19 20 |
# File 'lib/async_request_reply/worker.rb', line 18 def new_record @new_record end |
#redirect_url ⇒ Object
Returns the value of attribute redirect_url.
15 16 17 |
# File 'lib/async_request_reply/worker.rb', line 15 def redirect_url @redirect_url end |
#start_time ⇒ Object
Returns the value of attribute start_time.
18 19 20 |
# File 'lib/async_request_reply/worker.rb', line 18 def start_time @start_time end |
#status ⇒ Object
Returns the value of attribute status.
15 16 17 |
# File 'lib/async_request_reply/worker.rb', line 15 def status @status end |
#status_url ⇒ Object
Returns the value of attribute status_url.
15 16 17 |
# File 'lib/async_request_reply/worker.rb', line 15 def status_url @status_url end |
#success ⇒ Object
Returns the value of attribute success.
15 16 17 |
# File 'lib/async_request_reply/worker.rb', line 15 def success @success end |
#uuid ⇒ Object
Returns the value of attribute uuid.
15 16 17 |
# File 'lib/async_request_reply/worker.rb', line 15 def uuid @uuid end |
Class Method Details
._find(p_uuid) ⇒ Object
88 89 90 91 92 93 |
# File 'lib/async_request_reply/worker.rb', line 88 def self._find(p_uuid) resource = @@config.repository_adapter.get(p_uuid) return nil unless resource unpack(resource) end |
.find(p_uuid) ⇒ Object
81 82 83 84 85 86 |
# File 'lib/async_request_reply/worker.rb', line 81 def self.find(p_uuid) resource = _find(p_uuid) return nil if resource.empty? new(resource) end |
.message_pack_factory ⇒ Object
TODO: Desacoplar message pack factory
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/async_request_reply/worker.rb', line 154 def self. factory = MessagePack::Factory.new @@config..each do |fac| factory.register_type( fac[:first_byte], fac[:klass], packer: fac[:packer], unpacker: fac[:unpacker], recursive: true ) end factory end |
.unpack(packer) ⇒ Object
149 150 151 |
# File 'lib/async_request_reply/worker.rb', line 149 def self.unpack(packer) .load(packer) end |
Instance Method Details
#async_engine ⇒ Object
131 132 133 |
# File 'lib/async_request_reply/worker.rb', line 131 def async_engine handle_async_engine end |
#attributes ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/async_request_reply/worker.rb', line 43 def attributes { 'uuid' => uuid, 'status' => status, 'success' => success, 'failure' => failure, 'methods_chain' => methods_chain, 'class_instance' => class_instance, 'redirect_url' => redirect_url, 'start_time' => start_time, 'end_time' => end_time } end |
#default_attributes ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/async_request_reply/worker.rb', line 56 def default_attributes { methods_chain: [], 'status' => :waiting, success: { class_instance: 'self', methods_chain: [] }, failure: { class_instance: 'self', methods_chain: [] } } end |
#destroy(seconds_in = 0.seconds.to_i) ⇒ Object
Remove request from data store. Can pass as params integer value for how many seconds you want remove from data store
113 114 115 116 117 118 |
# File 'lib/async_request_reply/worker.rb', line 113 def destroy(seconds_in = 0.seconds.to_i) return @@config.repository_adapter.del(id) if seconds_in.zero? self._ttl = seconds_in save end |
#elapsed ⇒ Object
95 96 97 |
# File 'lib/async_request_reply/worker.rb', line 95 def elapsed (@end_time || Process.clock_gettime(Process::CLOCK_MONOTONIC)) - @start_time end |
#formated_erros_to_json(errors) ⇒ Object
228 229 230 231 232 233 234 235 236 |
# File 'lib/async_request_reply/worker.rb', line 228 def formated_erros_to_json(errors) resouce = if errors.respond_to?(:map) errors.map { |title, error| { title: title, detail: error } } else [{ title: errors }] end resouce.map { |error| error.select { |_k, v| !v.nil? && !(v.respond_to?(:empty?) && v.empty?) } } end |
#id ⇒ Object
77 78 79 |
# File 'lib/async_request_reply/worker.rb', line 77 def id uuid end |
#new_record?(p_uuid) ⇒ Boolean
71 72 73 74 75 |
# File 'lib/async_request_reply/worker.rb', line 71 def new_record?(p_uuid) return true if p_uuid.nil? @@config.repository_adapter.get(p_uuid).nil? end |
#perform ⇒ Object
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/async_request_reply/worker.rb', line 184 def perform begin @start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) @@config.logger.info("Start perform worker #{self.uuid}") raise "Can't update worker while it's performing" unless update(status: :processing) if element = MethodsChain.run_methods_chain(class_instance, methods_chain) @@config.logger.info("successful workflow perform worker #{self.uuid}") klass_after = success[:class_instance] == 'self' ? element : success[:class_instance] methods_after = success[:methods_chain] result = MethodsChain.run_methods_chain(klass_after, methods_after) @end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) raise "Can't update worker while it's performing" unless update(status: :done) @@config.logger.info("Done perform worker #{self.uuid}") result else @@config.logger.error("failure workflow perform worker #{self.uuid}") klass_reject_after = failure[:class_instance] == 'self' ? element : failure[:class_instance] methods_reject_after = failure[:methods_chain] result = MethodsChain.run_methods_chain(klass_reject_after,methods_reject_after) @end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) raise "Can't update worker while it's performing" unless update( status: :unprocessable_entity, errors: formated_erros_to_json(result)) @@config.logger.error("Done perform worker #{self.uuid} with fails #{formated_erros_to_json(result)}") result end rescue StandardError => e @@config.logger.fatal("Fatal perform worker #{self.uuid} with fails #{formated_erros_to_json(e.message)}") @end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) update(status: :internal_server_error, errors: formated_erros_to_json(e.)) nil end end |
#perform_async ⇒ Object
126 127 128 129 |
# File 'lib/async_request_reply/worker.rb', line 126 def perform_async save handle_async_engine.perform_async(id) end |
#reload! ⇒ Object
105 106 107 |
# File 'lib/async_request_reply/worker.rb', line 105 def reload! assign_attributes(self.class._find(self.uuid)) end |
#save ⇒ Object
120 121 122 123 124 |
# File 'lib/async_request_reply/worker.rb', line 120 def save return nil unless valid? attributes = self.class.unpack(@@config.repository_adapter.setex(uuid, (_ttl || LIVE_TIMEOUT), to_msgpack)) assign_attributes(attributes) end |
#to_msgpack ⇒ Object
Serializa a intância usando o MessagePack. Além de ser mais rápido e menor que JSON é uma boa opção para serializar arquivos. Ref.: msgpack.org/ Ref.: github.com/msgpack/msgpack-ruby#extension-types
145 146 147 |
# File 'lib/async_request_reply/worker.rb', line 145 def to_msgpack self.class..dump(attributes.as_json) end |
#update(attrs) ⇒ Object
100 101 102 103 |
# File 'lib/async_request_reply/worker.rb', line 100 def update(attrs) assign_attributes(attrs) save end |
#valid? ⇒ Boolean
36 37 38 39 40 41 |
# File 'lib/async_request_reply/worker.rb', line 36 def valid? @errors = [] @errors << "class_instance can't be blank." if class_instance.nil? @errors.empty? end |
#with_async_engine(engine_class) ⇒ Object
135 136 137 138 |
# File 'lib/async_request_reply/worker.rb', line 135 def with_async_engine(engine_class) @handle_async_engine = engine_class self end |