Chutzen

Chutzen is a little toolkit to help with implementing batch processing. It uses Sidekiq as a job queue for incoming jobs and as a notification queue for outgoing notifications.

Enqueueing a job

Let's look at an example app called Squirrel that wants to collect GIFs from the internet.

Chutzen.perform_async(
  'notify' => { 'class' => 'Squirrel::Result' },
  'result' => { 'id' => 42 },
  'commands' => [
    {
      'execute' => [
        'curl',
        {
          '-o' => 'food.gif'
        },
        'https://example.com/food.gif'
      ]
      'skip_when' => '12 > 24',
      'fail_when' => { 'read_timeout' => 2.5 },
      'result' => {
        'food' => 'found'
      }
    }
  ]
)

Dynamic expresssions

Some properties in the job description accept dynamic expressions. Expressions are an onion of expression, template, and expressons.

Let's start with a complex example and then peal the onion.

${video.width} < ${minimal_width}

Any expression is first evaluated as a template. All values between the ${} notation are evaluated as a query and then replaced in the string. The resulting string is again evaluate as a query and results in a value. So it's queries, inside a template, inside a query.

Let's take the following job dictionary as an example:

{ 'minimal_width' => 512, 'video' => { 'width' => 1080 } }

Step one we replace all the variable expressions by their query result in the template.

video.width => 1080
minimal_width => 512

So the resulting string is

1080 < 512

In certain properties (eg. skip_when) the resulting string is evaluated as a query.

1080 < 512 => false

The documentation below will tell you when a property is treated as a template to produce a string and when a property is treated as a query to return a (boolean) value.

Variable expressions

A variable expression starts with ${ and ends with }. The expression can be anything valid in the Chutzen query language (see below).

A literal expression starts with @ and is replaced by the entire unparsed output from a command.

{
  'duration' => 'The duration is: ${ffprobe.format.duration}',
  #=> {"duration":"The duration is: 55433.234"}
  'ffprobe_output' => '@ffprobe'
  #=> {"ffprobe_output":{"steams":[…],"format":…}}
}

The Chutzen query language

The primary purpose of the query language is to select values from nested hashes and arrays.

The concept of the language is that you dig into a mixed structure of hashes and arrays. Numeric values reference the n-th item in an array. Strings are used as the key in a hash. The query is separated by dots.

For example the following Chutzen expression is the same as the Ruby expression below,

streams.0.height
dictionary['streams'][0]['height']

If you don't want to reference the entire path you can prefix an expression with //, this will perform a depth-first search for the first occurance of that key in the structure. Deep reference with an array index generally don't make a lot of sense.

For example, givens the following structure.

{
  'streams' => [
    { 'height' => 12, 'width' => 12, 'codec' => 'H.264 },
    { 'height' => 42 }
  ]
}

These are valid queries with their result.

streams.1.height => 42
streams.0.codec => 'H.265'
streams.1.codec => nil
//height => 12

Identifiers reference a value in the structure. You can also compare expression with the =, &, |, <, and > operators.

12 = 12 => true
streams.1.height = 65 => false
//height = 12 => true
streams.0.codec = 'MOV' => false
streams.0.height > 12 => false
true & false => false
true | false => true
streams.1.codec = undefined => true

Job description

  • notify: A hash of options used to enqueue a Sidekiq worker to report back after a command was performed. This section must include at least a class, but may also include all other valid Sidekiq job settings like queue. Note that the arg setting will be overwritten with the payload.
  • result: A hash that is sent as a payload with each notification. You can use this to send a reference to the job to you can always associate results and errors with the job. Interpreted as template.
  • commands: A list of command hashes which must be performed to complete the job.

Command description

  • execute: A string or list which describes a command to be executed, see below for details. Interpreted as template.
  • skip_when: Describes when to skip the command. Interpreted as boolean query.
  • perform_when: Describes when to perform the commmand. Inverse of skip_when for convenience. Interpreted as boolean query.
  • fail_when: Describes when to kill the command (ie. in case of a timeout).
  • merge: Hash to instruct that an output stream has to be stored in the global job dictionary, see below for details.
  • result: Hash with values to use as the payload for the notify worker. Values may include variables. Note that the payload will be encoded as JSON to increase interoperability and allow for easier backwards compatiblity in case of notification payload changes. Properties are interpreted as template.
  • remember: Merge keys into the global job dictionary. Properties are interpreted as template.
  • optional: When set to true the command will not throw an exception in case of a non-zero exit code of the executed command and it will continue processing the next command.

Execute description

When execute contain a string it will be passed to be executed verbatim. An Array will be joined and Hash values in the Array will be joined to generate valid switch values. Arrays and Hashes are supported to make it easier to write code that merges repeating elements in a job description.

Fail when description

The fail_when section can contain any of the following options:

  • read_timeout: Check if the stdout or stderr grows and kill the command if this doesn't happen for the specified number of seconds.

Merge description

Currently this only supports one of two streams: stdout or stderr. The value of either section must be json, string, or integer. This setting causes the command to save the output of the indicated stream and parse it as the indicated format. The parsed value will be added to the global job dictionary which can be referenced in queries in subsequent commands.

You can us the as section to set the name for the dictionary in case the command is not useful.

{ 'merge' => { 'stdout' => 'integer', 'as' => 'user_count' } }

Remember description

Merges additional data into the global job dictionary. This is useful when you want to prevent repeating complex expressions across jobs or perform nested boolean expressions.

{
  'merge' => { 'stdout' => 'string', 'as' => 'result' },
  'remember' => {
    'missing' => "result = 'missing'",
    'blank' => "result = 'blank'",
    'do_nothing' => 'missing | blank'
  }
},
{
  'skip_when' => 'do_nothing'
}

Using remember you can store an undefined value. A Chutzen expression uses the special identifier undefined to express this value. This works similar to true and false.

{
  'remember' => {
    'count' => 'undefined',
    'work' => 'undefined = count'
  }
}

Receiving notifications

The arguments passed to notify will be used to schedule a Sidekiq job back on the the queue so your application can process the result.

module Squirrel
  class Result
    include Sidekiq::Worker

    def perform(payload)
      args = JSON.parse(payload)
      # … do something with the args
    end
  end
end

Safely stopping Chutzen

Chutzen is basically a Sidekiq process with some extra trimming. In many cases it will be started in a way similar to this:

sidekiq --queue chutzen --require ./lib/chutzen.rb

Sidekiq works optimally with small fire-and-forget tasks written in Ruby. Chutzen is more about batch processing and generally starts additional processes in the same progress group.

For example:

   3630 ?        Ssl  290:59 sidekiq 6.1.2 chutzen [2 of 2 busy]
1346100 ?        S      0:00  \_ sh -c ffmpeg
1346101 ?        RLl   64:19  |   \_ ffmpeg

You can tell Sidekiq to stop accepting new jobs by sending it the TSTP signal. The intended result is that it finishes the current job without starting new jobs so you can eventually restart Sidekiq safely. In our example that would be:

kill -TSTP 3630

Downside of this approach is that it also stops the ffmpeg process because all processes in the process group receive the same signal. So everything stops and nothing ever finishes.

The solution is to send a Chutzen signal.

We tell Sidekiq to listen to an additional ‘control’ queue. Which is basically just a regular queue used to send signal jobs to a specific host.

sidekiq --queue chutzen --queue chutzen-13-production \
  --require ./lib/chutzen.rb

Now we can schedule the Chutzen::Signal worker through the emit_signal convenience method.

Chutzen.emit_signal('stop', queue: 'chutzen-13-production')

Downside of using a job is that you can enqueue multiple signals which may keep stopping Chutzen.

20.times { Chutzen.emit_signal('stop', queue: 'chutzen-13') } 

That is why a Chutzen::Signal worker will, by default, only process within 30 seconds after is was enqueued. The expiration time can be expressed in a number of ways if you don't like the default.

Chutzen.emit_signal('stop', queue: 'c', expires_in: 10) # seconds
Chutzen.emit_signal('stop', queue: 'c', expires_at: Time.now + 3600)
Chutzen::Signal.
  set(queue: 'chutzen-5-test').
  perform_async('stop', (Time.now + 10).to_i)

New Relic integrations

Chutzen can send custom metrics and exceptions to New Relic. The New Relic gem is ‘dynamically’ loaded so you have to make sure it's installed for this to work. You also have to provide a valid confg/newrelic.yml file relative to the working directory where Chutzen is started.