Class: AsyncRequestReply::WorkerInBatch

Inherits:
Object
  • Object
show all
Defined in:
lib/async_request_reply/worker_in_batch.rb

Defined Under Namespace

Classes: WorkerInBatchNotFound

Constant Summary collapse

ONE_HOUR =
3600
LIVE_TIMEOUT =
ONE_HOUR
@@config =
AsyncRequestReply::Config.instance

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uuid = nil) ⇒ WorkerInBatch

Initializes a new batch of workers with an optional UUID.

If a UUID is not provided or the UUID is invalid, a new UUID is generated.

Parameters:

  • uuid (String, nil) (defaults to: nil)

    The UUID of the batch. If nil, a new UUID is generated.



45
46
47
48
49
50
51
52
53
54
# File 'lib/async_request_reply/worker_in_batch.rb', line 45

def initialize(uuid = nil)
  @worker_ids = []
  @meta = {}
  @uuid = new_record?(uuid) ? "async_request_in_batch:#{SecureRandom.uuid}" : uuid

  @waiting = []
  @processing = []
  @failures = []
  @successes = []
end

Instance Attribute Details

#end_timeArray<String>

The worker UUIDs associated with this batch.

Returns:

  • (Array<String>)

    an array of worker UUIDs.



29
30
31
# File 'lib/async_request_reply/worker_in_batch.rb', line 29

def end_time
  @end_time
end

#failuresArray<String>

The worker UUIDs associated with this batch.

Returns:

  • (Array<String>)

    an array of worker UUIDs.



29
30
31
# File 'lib/async_request_reply/worker_in_batch.rb', line 29

def failures
  @failures
end

#metaArray<String>

The worker UUIDs associated with this batch.

Returns:

  • (Array<String>)

    an array of worker UUIDs.



29
30
31
# File 'lib/async_request_reply/worker_in_batch.rb', line 29

def meta
  @meta
end

#processingArray<String>

The worker UUIDs associated with this batch.

Returns:

  • (Array<String>)

    an array of worker UUIDs.



29
30
31
# File 'lib/async_request_reply/worker_in_batch.rb', line 29

def processing
  @processing
end

#start_timeArray<String>

The worker UUIDs associated with this batch.

Returns:

  • (Array<String>)

    an array of worker UUIDs.



29
30
31
# File 'lib/async_request_reply/worker_in_batch.rb', line 29

def start_time
  @start_time
end

#successesArray<String>

The worker UUIDs associated with this batch.

Returns:

  • (Array<String>)

    an array of worker UUIDs.



29
30
31
# File 'lib/async_request_reply/worker_in_batch.rb', line 29

def successes
  @successes
end

#uuidString

Returns the UUID of the batch.

Returns:

  • (String)

    The UUID of the batch.



29
30
31
# File 'lib/async_request_reply/worker_in_batch.rb', line 29

def uuid
  @uuid
end

#waitingArray<String>

The worker UUIDs associated with this batch.

Returns:

  • (Array<String>)

    an array of worker UUIDs.



29
30
31
# File 'lib/async_request_reply/worker_in_batch.rb', line 29

def waiting
  @waiting
end

#worker_idsArray<String>

The worker UUIDs associated with this batch.

Returns:

  • (Array<String>)

    an array of worker UUIDs.



29
30
31
# File 'lib/async_request_reply/worker_in_batch.rb', line 29

def worker_ids
  @worker_ids
end

Class Method Details

.find(p_uuid) ⇒ AsyncRequestReply::WorkerInBatch?

Finds a ‘WorkerInBatch` by its UUID.

Parameters:

  • p_uuid (String)

    The UUID of the batch to find.

Returns:



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/async_request_reply/worker_in_batch.rb', line 83

def self.find(p_uuid)
  resource = _find(p_uuid)
  return nil unless resource

  instance = new(resource['uuid'])
  instance.worker_ids = resource['worker_ids']
  instance.start_time = resource['start_time']
  instance.end_time = resource['end_time']
  instance.worker_ids = resource['worker_ids']
  instance.waiting = resource['waiting']
  instance.processing = resource['processing']
  instance.failures = resource['failures']
  instance.successes = resource['successes']
  instance.meta = resource['meta']
  instance
end

.find!(p_uuid) ⇒ AsyncRequestReply::WorkerInBatch?

Finds a ‘WorkerInBatch` by its UUID raise exception case not found.

Parameters:

  • p_uuid (String)

    The UUID of the batch to find.

Returns:

Raises:



72
73
74
75
76
77
# File 'lib/async_request_reply/worker_in_batch.rb', line 72

def self.find!(p_uuid)
  resource = find(p_uuid)
  raise(WorkerInBatchNotFound, p_uuid) unless resource

  resource
end

Instance Method Details

#as_jsonHash

Returns a JSON representation of the batch.

Returns:

  • (Hash)

    The JSON-compatible representation of the batch.



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/async_request_reply/worker_in_batch.rb', line 187

def as_json
  {
    uuid: @uuid,
    start_time: @start_time,
    end_time: @end_time,
    worker_ids: @worker_ids,
    waiting: @waiting,
    qtd_waiting: @waiting.count,
    processing: @processing,
    qtd_processing: @processing.count,
    failures: @failures,
    qtd_fail: @failures.count,
    successes: @successes,
    qtd_success: @successes.count,
    meta: @meta,
    qtd_processed: processed.count,
    total: total
  }
end

#elapsedFloat?

Returns the elapsed time for the batch.

The elapsed time is the difference between the start time and end time. If the end time is unavailable, the current process time is used.

Returns:

  • (Float, nil)

    The elapsed time in seconds or nil if start time is unavailable.



143
144
145
146
147
# File 'lib/async_request_reply/worker_in_batch.rb', line 143

def elapsed
  return nil unless @start_time

  (@end_time || Process.clock_gettime(Process::CLOCK_MONOTONIC)) - @start_time
end

#idString

Returns the UUID of the batch.

Returns:

  • (String)

    The UUID of the batch.



103
104
105
# File 'lib/async_request_reply/worker_in_batch.rb', line 103

def id
  uuid
end

#performvoid

This method returns an undefined value.

Starts the asynchronous processing of all workers in the batch.



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/async_request_reply/worker_in_batch.rb', line 152

def perform
  # TODO: Add concurrency model.
  @start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  save
  @waiting = worker_ids.dup
  @waiting.size.times do
    @processing.push(@waiting.pop)
    save
    worker_id = @processing.last
    worker = AsyncRequestReply::Worker.find(worker_id)
    worker.raise_error = false
    worker.perform
    worker.reload!
    if %w[unprocessable_entity internal_server_error].include?(worker.status)
      @failures.push(@processing.pop)
    else
      @successes.push(@processing.pop)
    end
    save
  end

  @end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  save
end

#perform_asyncObject



177
178
179
180
181
182
# File 'lib/async_request_reply/worker_in_batch.rb', line 177

def perform_async
  save
  AsyncRequestReply::Worker.new(class_instance: AsyncRequestReply::WorkerInBatch,
                                methods_chain: [[:find, id],
                                                [:perform]]).perform_async
end

#processedInteger

Returns the number of processed workers (successes + failures).

Returns:

  • (Integer)

    The number of processed workers.



133
134
135
# File 'lib/async_request_reply/worker_in_batch.rb', line 133

def processed
  @successes + @failures
end

#savevoid

This method returns an undefined value.

Saves the current state of the batch to the repository.

The batch data is serialized to JSON and stored in the repository with an expiration time of 1 hour.



118
119
120
121
# File 'lib/async_request_reply/worker_in_batch.rb', line 118

def save
  # TODO-2024-11-27: Decide serializer strategy (e.g., json, message_packer).
  @@config.repository_adapter.setex(uuid, LIVE_TIMEOUT, as_json.to_json)
end

#totalInteger

Returns the total number of workers in the batch.

Returns:

  • (Integer)

    The total count of workers in the batch.



126
127
128
# File 'lib/async_request_reply/worker_in_batch.rb', line 126

def total
  worker_ids.count
end

#workers=(workers) ⇒ Object

Assigns workers to the batch.

The workers are saved, and their UUIDs are stored in the batch.

Parameters:



61
62
63
64
65
66
# File 'lib/async_request_reply/worker_in_batch.rb', line 61

def workers=(workers)
  workers.map do |worker|
    worker.save
    @worker_ids << worker.uuid
  end
end