Module: WorkShaper

Defined in:
lib/work_shaper.rb,
lib/work_shaper/worker.rb,
lib/work_shaper/manager.rb,
lib/work_shaper/version.rb,
lib/work_shaper/offset_holder.rb

Overview

WorkShaper is inspired by Kafka partitions and offsets, but could be used to organize and parallelize other forms of work. The original goal was to parallelize processing offsets in a given partition while maintaining order for a subset of the messages based on Sub Keys.

The key concepts include Sub Key, Partition, and Offset. Work on a given Sub Key must be executed in the order in which it is enqueued. However, work on different Sub Keys can run in parallel. All Work (offset) on a given Partition must be Acknowledged in continuous monotonically increasing order. If a higher offset’s work is completed before a lower offset, the Manager will hold the acknowledgement until all lower offsets are acknowledged. Remember, work (offsets) for a given sub key are still processed in order.

Defined Under Namespace

Classes: Manager, OffsetHolder, Worker

Constant Summary collapse

VERSION =
"0.1.3.1"

Class Method Summary collapse

Class Method Details

.formatterObject



34
35
36
37
38
39
40
41
42
43
44
# File 'lib/work_shaper.rb', line 34

def self.formatter
  proc do |severity, datetime, _progname, msg|
    datefmt = datetime.strftime('%Y-%m-%dT%H:%M:%S.%6N')
    {
      timestamp: datefmt,
      level: severity.ljust(5),
      pid: Process.pid,
      msg: msg
    }.to_json + "\n"
  end
end

.loggerObject



26
27
28
29
30
31
32
# File 'lib/work_shaper.rb', line 26

def self.logger
  @logger ||= Logger.new(
    $stdout,
    level: ENV['LOG_LEVEL'] || 'DEBUG',
    formatter: formatter
  )
end

.logger=(logger) ⇒ Object



22
23
24
# File 'lib/work_shaper.rb', line 22

def self.logger=(logger)
  @logger = logger
end