Class: AsyncRequestReply::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/async_request_reply/worker.rb

Constant Summary collapse

STATUS =
%i[waiting processing done unprocessable_entity internal_server_error]
ONE_HOUR =
60*60
LIVE_TIMEOUT =

TODO-2023-10-22: Isso limita o processamento de máximo 1 hora.

ONE_HOUR
@@config =

TODO-2023-10-22: Adicinar mais logs a classe.

AsyncRequestReply::Config.instance

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(attrs = {}) ⇒ Worker

Returns a new instance of Worker.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/async_request_reply/worker.rb', line 20

def initialize(attrs = {})
  attrs.transform_keys(&:to_sym)
  @uuid = new_record?(attrs[:uuid]) ? "async_request:#{SecureRandom.uuid}" : attrs[:uuid]

  # INFO: Remover do repositório depois que async_request for processado
  # TODO-2023-10-22: Entender a relação entre número de objetos criados e
  # consulmo de mémoria no host onde está o redis. Definir uma estrátegia
  # que limite o tamanho máximo de uma instância da classe e controle do ciclo
  # de vida de cada instancia no banco pra ofecer melhor controle pra cada caso
  # de uso.
  destroy(30.seconds.to_i) if !new_record?(attrs[:uuid]) && attrs[:status].to_sym == :done

  # Assigners attributes
  assign_attributes(default_attributes.merge(attrs))
end

Instance Attribute Details

#_ttlObject

Returns the value of attribute _ttl.



15
16
17
# File 'lib/async_request_reply/worker.rb', line 15

def _ttl
  @_ttl
end

#class_instanceObject

Returns the value of attribute class_instance.



15
16
17
# File 'lib/async_request_reply/worker.rb', line 15

def class_instance
  @class_instance
end

#end_timeObject

Returns the value of attribute end_time.



18
19
20
# File 'lib/async_request_reply/worker.rb', line 18

def end_time
  @end_time
end

#errorsObject

Returns the value of attribute errors.



18
19
20
# File 'lib/async_request_reply/worker.rb', line 18

def errors
  @errors
end

#failureObject

Returns the value of attribute failure.



15
16
17
# File 'lib/async_request_reply/worker.rb', line 15

def failure
  @failure
end

#methods_chainObject

Returns the value of attribute methods_chain.



15
16
17
# File 'lib/async_request_reply/worker.rb', line 15

def methods_chain
  @methods_chain
end

#new_recordObject (readonly)

Returns the value of attribute new_record.



18
19
20
# File 'lib/async_request_reply/worker.rb', line 18

def new_record
  @new_record
end

#raise_errorObject

Returns the value of attribute raise_error.



15
16
17
# File 'lib/async_request_reply/worker.rb', line 15

def raise_error
  @raise_error
end

#redirect_urlObject

Returns the value of attribute redirect_url.



15
16
17
# File 'lib/async_request_reply/worker.rb', line 15

def redirect_url
  @redirect_url
end

#start_timeObject

Returns the value of attribute start_time.



18
19
20
# File 'lib/async_request_reply/worker.rb', line 18

def start_time
  @start_time
end

#statusObject

Returns the value of attribute status.



15
16
17
# File 'lib/async_request_reply/worker.rb', line 15

def status
  @status
end

#status_urlObject

Returns the value of attribute status_url.



15
16
17
# File 'lib/async_request_reply/worker.rb', line 15

def status_url
  @status_url
end

#successObject

Returns the value of attribute success.



15
16
17
# File 'lib/async_request_reply/worker.rb', line 15

def success
  @success
end

#uuidObject

Returns the value of attribute uuid.



15
16
17
# File 'lib/async_request_reply/worker.rb', line 15

def uuid
  @uuid
end

Class Method Details

._find(p_uuid) ⇒ Object



90
91
92
93
94
95
# File 'lib/async_request_reply/worker.rb', line 90

def self._find(p_uuid)
  resource = @@config.repository_adapter.get(p_uuid)
  return nil unless resource

  unpack(resource)
end

.find(p_uuid) ⇒ Object



83
84
85
86
87
88
# File 'lib/async_request_reply/worker.rb', line 83

def self.find(p_uuid)
  resource = _find(p_uuid)
  return nil if resource.empty?

  new(resource)
end

.message_pack_factoryObject

TODO: Desacoplar message pack factory



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/async_request_reply/worker.rb', line 156

def self.message_pack_factory
  factory = MessagePack::Factory.new

	@@config.message_packer_factories.each do |fac|
		factory.register_type(
			fac[:first_byte],
			fac[:klass],
			packer: fac[:packer],
			unpacker: fac[:unpacker],
			recursive: true
			)
	end

  factory
end

.unpack(packer) ⇒ Object



151
152
153
# File 'lib/async_request_reply/worker.rb', line 151

def self.unpack(packer)
  message_pack_factory.load(packer)
end

Instance Method Details

#async_engineObject



133
134
135
# File 'lib/async_request_reply/worker.rb', line 133

def async_engine
	handle_async_engine
end

#attributesObject



43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/async_request_reply/worker.rb', line 43

def attributes
  { 'uuid' => uuid,
    'status' => status,
    'success' => success,
    'failure' => failure,
    'methods_chain' => methods_chain,
    'class_instance' => class_instance,
    'redirect_url' => redirect_url,
    'start_time' => start_time,
    'end_time' => end_time,
    'raise_error' => raise_error
  }
end

#default_attributesObject



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/async_request_reply/worker.rb', line 57

def default_attributes
  {
  	raise_error: true,
  	methods_chain: [],
    'status' => :waiting,
    success: {
    	class_instance: 'self',
    	methods_chain: []
    },
    failure: {
    	class_instance: 'self',
    	methods_chain: []
    }
  }
end

#destroy(seconds_in = 0.seconds.to_i) ⇒ Object

Remove request from data store. Can pass as params integer value for how many seconds you want remove from data store



115
116
117
118
119
120
# File 'lib/async_request_reply/worker.rb', line 115

def destroy(seconds_in = 0.seconds.to_i)
  return @@config.repository_adapter.del(id) if seconds_in.zero?

  self._ttl = seconds_in
  save
end

#elapsedObject



97
98
99
# File 'lib/async_request_reply/worker.rb', line 97

def elapsed
	(@end_time || Process.clock_gettime(Process::CLOCK_MONOTONIC)) - @start_time
end

#formated_erros_to_json(errors) ⇒ Object



231
232
233
234
235
236
237
238
239
# File 'lib/async_request_reply/worker.rb', line 231

def formated_erros_to_json(errors)
  resouce = if errors.respond_to?(:map)
              errors.map { |title, error| { title: title, detail: error } }
            else
              [{ title: errors }]
            end

  resouce.map { |error| error.select { |_k, v| !v.nil? && !(v.respond_to?(:empty?) && v.empty?) } }
end

#idObject



79
80
81
# File 'lib/async_request_reply/worker.rb', line 79

def id
  uuid
end

#new_record?(p_uuid) ⇒ Boolean

Returns:

  • (Boolean)


73
74
75
76
77
# File 'lib/async_request_reply/worker.rb', line 73

def new_record?(p_uuid)
  return true if p_uuid.nil?

  @@config.repository_adapter.get(p_uuid).nil?
end

#performObject



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/async_request_reply/worker.rb', line 186

def perform
  begin
  	@start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  	@@config.logger.info("Start perform worker #{self.uuid}")

  	raise "Can't update worker while it's performing" unless update(status: :processing)
  	
  	if element = MethodsChain.run_methods_chain(class_instance, methods_chain)
  		@@config.logger.info("successful workflow perform worker #{self.uuid}")

  		klass_after = success[:class_instance] == 'self' ? element : success[:class_instance]
  		methods_after = success[:methods_chain]

  		result = MethodsChain.run_methods_chain(klass_after, methods_after)

  		@end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      
      raise "Can't update worker while it's performing" unless update(status: :done)
      @@config.logger.info("Done perform worker #{self.uuid}")
      result
    else
    	@@config.logger.error("failure workflow perform worker #{self.uuid}")
      klass_reject_after = failure[:class_instance] == 'self' ? element : failure[:class_instance]
      methods_reject_after = failure[:methods_chain]

      result = MethodsChain.run_methods_chain(klass_reject_after,methods_reject_after)
       
       @end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)

      raise "Can't update worker while it's performing" unless update(
      			 status: :unprocessable_entity,
             errors: formated_erros_to_json(result))

      @@config.logger.error("Done perform worker #{self.uuid} with fails #{formated_erros_to_json(result)}")
      result
    end
  rescue StandardError => e
     @@config.logger.fatal("Fatal perform worker #{self.uuid} with fails #{formated_erros_to_json(e.message)}")
  	@end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    update(status: :internal_server_error, errors: formated_erros_to_json(e.message))
    raise e if @raise_error
    nil
  end
end

#perform_asyncObject



128
129
130
131
# File 'lib/async_request_reply/worker.rb', line 128

def perform_async
	save
	handle_async_engine.perform_async(id)
end

#reload!Object



107
108
109
# File 'lib/async_request_reply/worker.rb', line 107

def reload!
	assign_attributes(self.class._find(self.uuid))
end

#saveObject



122
123
124
125
126
# File 'lib/async_request_reply/worker.rb', line 122

def save
	return nil unless valid?
  attributes = self.class.unpack(@@config.repository_adapter.setex(uuid, (_ttl || LIVE_TIMEOUT), to_msgpack))
  assign_attributes(attributes)
end

#to_msgpackObject

Serializa a intância usando o MessagePack. Além de ser mais rápido e menor que JSON é uma boa opção para serializar arquivos. Ref.: msgpack.org/ Ref.: github.com/msgpack/msgpack-ruby#extension-types



147
148
149
# File 'lib/async_request_reply/worker.rb', line 147

def to_msgpack
  self.class.message_pack_factory.dump(attributes.as_json)
end

#update(attrs) ⇒ Object



102
103
104
105
# File 'lib/async_request_reply/worker.rb', line 102

def update(attrs)
  assign_attributes(attrs)
  save
end

#valid?Boolean

Returns:

  • (Boolean)


36
37
38
39
40
41
# File 'lib/async_request_reply/worker.rb', line 36

def valid?
	@errors = []
	@errors << "class_instance can't be blank." if class_instance.nil?

	@errors.empty?
end

#with_async_engine(engine_class) ⇒ Object



137
138
139
140
# File 'lib/async_request_reply/worker.rb', line 137

def with_async_engine(engine_class)
	@handle_async_engine = engine_class
	self
end