Class: Kiqchestra::DefaultWorkflowStore

Inherits:
WorkflowStore show all
Defined in:
lib/kiqchestra/default_workflow_store.rb

Overview

The DefaultWorkflowStore class is the default implementation of WorkflowStore for storing workflow dependencies, progress, and arguments in a Redis-backed store.

This implementation uses Redis for persistence, providing an efficient and scalable storage system for workflows. Users can customize this by implementing their own subclass of WorkflowStore if needed.

Example Usage:

store = Kiqchestra::DefaultWorkflowStore.new
store. "workflow_123", { a_job: { deps: [], args: [1, 2, 3] } }
 = store. "workflow_123"
store.write_progress "workflow_123", { a_job: "complete" }
progress = store.read_progress "workflow_123"

Instance Method Summary collapse

Methods inherited from WorkflowStore

#initialize

Constructor Details

This class inherits a constructor from Kiqchestra::WorkflowStore

Instance Method Details

#read_metadata(workflow_id) ⇒ Hash

Reads the metadata for a workflow from Redis.

Parameters:

  • workflow_id (String)

    The workflow ID to retrieve workflow data for.

Returns:

  • (Hash)

    A hash representing the workflow metadata, where each key is a task ID and the value is a hash with keys ‘:deps` and `:args`.



27
28
29
30
# File 'lib/kiqchestra/default_workflow_store.rb', line 27

def (workflow_id)
  raw_data = Kiqchestra::RedisClient.client.get (workflow_id)
  JSON.parse(raw_data || "{}")
end

#read_progress(workflow_id) ⇒ Hash

Reads the progress of a workflow from Redis.

Parameters:

  • workflow_id (String)

    The workflow ID to retrieve progress for.

Returns:

  • (Hash)

    A hash representing the progress of the workflow, where each key is a task ID and the value indicates the status.



48
49
50
51
# File 'lib/kiqchestra/default_workflow_store.rb', line 48

def read_progress(workflow_id)
  raw_data = Kiqchestra::RedisClient.client.get progress_key(workflow_id)
  JSON.parse(raw_data || "{}")
end

#write_metadata(workflow_id, metadata) ⇒ Object

Writes the metadata for a workflow to Redis.

Examples:

{ a_job: { deps: [], args: [1, 2, 3] }, b_job: { deps: [:a_job], args: nil } }


Parameters:

  • workflow_id (String)

    The workflow ID to store dependencies for.

  • metadata (Hash)

    A hash representing the metadata to store.



37
38
39
40
41
# File 'lib/kiqchestra/default_workflow_store.rb', line 37

def (workflow_id, )
  Kiqchestra::RedisClient.client.set (workflow_id),
                                     .to_json,
                                     ex: 604_800 # Default TTL: 7 days
end

#write_progress(workflow_id, progress) ⇒ Object

Writes the progress of a workflow to Redis.

Examples:

{ a_worker: “complete”, b_worker: “in_progress” }


Parameters:

  • workflow_id (String)

    The workflow ID to store progress for.

  • progress (Hash)

    A hash representing the progress to store.



58
59
60
61
62
# File 'lib/kiqchestra/default_workflow_store.rb', line 58

def write_progress(workflow_id, progress)
  Kiqchestra::RedisClient.client.set progress_key(workflow_id),
                                     progress.to_json,
                                     ex: 604_800 # Default TTL: 7 days
end