Class: AsyncRequestReply::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/async_request_reply/worker.rb

Constant Summary collapse

STATUS =
i[waiting processing done unprocessable_entity internal_server_error]
ONE_HOUR =
60*60
LIVE_TIMEOUT =

TODO-2023-10-22: Isso limita o processamento de máximo 1 hora.

ONE_HOUR
@@config =

TODO-2023-10-22: Adicinar mais logs a classe.

AsyncRequestReply::Config.instance

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(attrs = {}) ⇒ Worker



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/async_request_reply/worker.rb', line 20

def initialize(attrs = {})
  attrs.transform_keys(&:to_sym)
  @uuid = new_record?(attrs[:uuid]) ? "async_request:#{SecureRandom.uuid}" : attrs[:uuid]

  # INFO: Remover do repositório depois que async_request for processado
  # TODO-2023-10-22: Entender a relação entre número de objetos criados e
  # consulmo de mémoria no host onde está o redis. Definir uma estrátegia
  # que limite o tamanho máximo de uma instância da classe e controle do ciclo
  # de vida de cada instancia no banco pra ofecer melhor controle pra cada caso
  # de uso.
  destroy(30.seconds.to_i) if !new_record?(attrs[:uuid]) && attrs[:status].to_sym == :done

  # Assigners attributes
  assign_attributes(default_attributes.merge(attrs))
end

Instance Attribute Details

#_ttlObject

Returns the value of attribute _ttl.



15
16
17
# File 'lib/async_request_reply/worker.rb', line 15

def _ttl
  @_ttl
end

#class_instanceObject

Returns the value of attribute class_instance.



15
16
17
# File 'lib/async_request_reply/worker.rb', line 15

def class_instance
  @class_instance
end

#end_timeObject

Returns the value of attribute end_time.



18
19
20
# File 'lib/async_request_reply/worker.rb', line 18

def end_time
  @end_time
end

#errorsObject

Returns the value of attribute errors.



18
19
20
# File 'lib/async_request_reply/worker.rb', line 18

def errors
  @errors
end

#failureObject

Returns the value of attribute failure.



15
16
17
# File 'lib/async_request_reply/worker.rb', line 15

def failure
  @failure
end

#methods_chainObject

Returns the value of attribute methods_chain.



15
16
17
# File 'lib/async_request_reply/worker.rb', line 15

def methods_chain
  @methods_chain
end

#new_recordObject (readonly)

Returns the value of attribute new_record.



18
19
20
# File 'lib/async_request_reply/worker.rb', line 18

def new_record
  @new_record
end

#redirect_urlObject

Returns the value of attribute redirect_url.



15
16
17
# File 'lib/async_request_reply/worker.rb', line 15

def redirect_url
  @redirect_url
end

#start_timeObject

Returns the value of attribute start_time.



18
19
20
# File 'lib/async_request_reply/worker.rb', line 18

def start_time
  @start_time
end

#statusObject

Returns the value of attribute status.



15
16
17
# File 'lib/async_request_reply/worker.rb', line 15

def status
  @status
end

#status_urlObject

Returns the value of attribute status_url.



15
16
17
# File 'lib/async_request_reply/worker.rb', line 15

def status_url
  @status_url
end

#successObject

Returns the value of attribute success.



15
16
17
# File 'lib/async_request_reply/worker.rb', line 15

def success
  @success
end

#uuidObject

Returns the value of attribute uuid.



15
16
17
# File 'lib/async_request_reply/worker.rb', line 15

def uuid
  @uuid
end

Class Method Details

._find(p_uuid) ⇒ Object



88
89
90
91
92
93
# File 'lib/async_request_reply/worker.rb', line 88

def self._find(p_uuid)
  resource = @@config.repository_adapter.get(p_uuid)
  return nil unless resource

  unpack(resource)
end

.find(p_uuid) ⇒ Object



81
82
83
84
85
86
# File 'lib/async_request_reply/worker.rb', line 81

def self.find(p_uuid)
  resource = _find(p_uuid)
  return nil if resource.empty?

  new(resource)
end

.message_pack_factoryObject

TODO: Desacoplar message pack factory



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/async_request_reply/worker.rb', line 154

def self.message_pack_factory
  factory = MessagePack::Factory.new

  @@config.message_packer_factories.each do |fac|
    factory.register_type(
      fac[:first_byte],
      fac[:klass],
      packer: fac[:packer],
      unpacker: fac[:unpacker],
      recursive: true
      )
  end

  factory
end

.unpack(packer) ⇒ Object



149
150
151
# File 'lib/async_request_reply/worker.rb', line 149

def self.unpack(packer)
  message_pack_factory.load(packer)
end

Instance Method Details

#async_engineObject



131
132
133
# File 'lib/async_request_reply/worker.rb', line 131

def async_engine
  handle_async_engine
end

#attributesObject



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/async_request_reply/worker.rb', line 43

def attributes
  { 'uuid' => uuid,
    'status' => status,
    'success' => success,
    'failure' => failure,
    'methods_chain' => methods_chain,
    'class_instance' => class_instance,
    'redirect_url' => redirect_url,
    'start_time' => start_time,
    'end_time' => end_time
  }
end

#default_attributesObject



56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/async_request_reply/worker.rb', line 56

def default_attributes
  {
    methods_chain: [],
    'status' => :waiting,
    success: {
      class_instance: 'self',
      methods_chain: []
    },
    failure: {
      class_instance: 'self',
      methods_chain: []
    }
  }
end

#destroy(seconds_in = 0.seconds.to_i) ⇒ Object

Remove request from data store. Can pass as params integer value for how many seconds you want remove from data store



113
114
115
116
117
118
# File 'lib/async_request_reply/worker.rb', line 113

def destroy(seconds_in = 0.seconds.to_i)
  return @@config.repository_adapter.del(id) if seconds_in.zero?

  self._ttl = seconds_in
  save
end

#elapsedObject



95
96
97
# File 'lib/async_request_reply/worker.rb', line 95

def elapsed
  (@end_time || Process.clock_gettime(Process::CLOCK_MONOTONIC)) - @start_time
end

#formated_erros_to_json(errors) ⇒ Object



228
229
230
231
232
233
234
235
236
# File 'lib/async_request_reply/worker.rb', line 228

def formated_erros_to_json(errors)
  resouce = if errors.respond_to?(:map)
              errors.map { |title, error| { title: title, detail: error } }
            else
              [{ title: errors }]
            end

  resouce.map { |error| error.select { |_k, v| !v.nil? && !(v.respond_to?(:empty?) && v.empty?) } }
end

#idObject



77
78
79
# File 'lib/async_request_reply/worker.rb', line 77

def id
  uuid
end

#new_record?(p_uuid) ⇒ Boolean



71
72
73
74
75
# File 'lib/async_request_reply/worker.rb', line 71

def new_record?(p_uuid)
  return true if p_uuid.nil?

  @@config.repository_adapter.get(p_uuid).nil?
end

#performObject



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/async_request_reply/worker.rb', line 184

def perform
  begin
    @start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    @@config.logger.info("Start perform worker #{self.uuid}")

    raise "Can't update worker while it's performing" unless update(status: :processing)
    
    if element = MethodsChain.run_methods_chain(class_instance, methods_chain)
      @@config.logger.info("successful workflow perform worker #{self.uuid}")

      klass_after = success[:class_instance] == 'self' ? element : success[:class_instance]
      methods_after = success[:methods_chain]

      result = MethodsChain.run_methods_chain(klass_after, methods_after)

      @end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      
      raise "Can't update worker while it's performing" unless update(status: :done)
      @@config.logger.info("Done perform worker #{self.uuid}")
      result
    else
      @@config.logger.error("failure workflow perform worker #{self.uuid}")
      klass_reject_after = failure[:class_instance] == 'self' ? element : failure[:class_instance]
      methods_reject_after = failure[:methods_chain]

      result = MethodsChain.run_methods_chain(klass_reject_after,methods_reject_after)
       
       @end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)

      raise "Can't update worker while it's performing" unless update(
             status: :unprocessable_entity,
             errors: formated_erros_to_json(result))

      @@config.logger.error("Done perform worker #{self.uuid} with fails #{formated_erros_to_json(result)}")
      result
    end
  rescue StandardError => e
     @@config.logger.fatal("Fatal perform worker #{self.uuid} with fails #{formated_erros_to_json(e.message)}")
    @end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    update(status: :internal_server_error, errors: formated_erros_to_json(e.message))
    nil
  end
end

#perform_asyncObject



126
127
128
129
# File 'lib/async_request_reply/worker.rb', line 126

def perform_async
  save
  handle_async_engine.perform_async(id)
end

#reload!Object



105
106
107
# File 'lib/async_request_reply/worker.rb', line 105

def reload!
  assign_attributes(self.class._find(self.uuid))
end

#saveObject



120
121
122
123
124
# File 'lib/async_request_reply/worker.rb', line 120

def save
  return nil unless valid?
  attributes = self.class.unpack(@@config.repository_adapter.setex(uuid, (_ttl || LIVE_TIMEOUT), to_msgpack))
  assign_attributes(attributes)
end

#to_msgpackObject

Serializa a intância usando o MessagePack. Além de ser mais rápido e menor que JSON é uma boa opção para serializar arquivos. Ref.: msgpack.org/ Ref.: github.com/msgpack/msgpack-ruby#extension-types



145
146
147
# File 'lib/async_request_reply/worker.rb', line 145

def to_msgpack
  self.class.message_pack_factory.dump(attributes.as_json)
end

#update(attrs) ⇒ Object



100
101
102
103
# File 'lib/async_request_reply/worker.rb', line 100

def update(attrs)
  assign_attributes(attrs)
  save
end

#valid?Boolean



36
37
38
39
40
41
# File 'lib/async_request_reply/worker.rb', line 36

def valid?
  @errors = []
  @errors << "class_instance can't be blank." if class_instance.nil?

  @errors.empty?
end

#with_async_engine(engine_class) ⇒ Object



135
136
137
138
# File 'lib/async_request_reply/worker.rb', line 135

def with_async_engine(engine_class)
  @handle_async_engine = engine_class
  self
end