Class: Gush::Client
- Inherits:
-
Object
- Object
- Gush::Client
- Defined in:
- lib/gush/client.rb
Instance Attribute Summary collapse
-
#configuration ⇒ Object
readonly
Returns the value of attribute configuration.
Instance Method Summary collapse
- #all_workflows ⇒ Object
- #configure {|configuration| ... } ⇒ Object
- #create_workflow(name) ⇒ Object
- #destroy_job(workflow_id, job) ⇒ Object
- #destroy_workflow(workflow) ⇒ Object
- #enqueue_job(workflow_id, job) ⇒ Object
- #find_workflow(id) ⇒ Object
-
#initialize(config = Gush.configuration) ⇒ Client
constructor
A new instance of Client.
- #load_job(workflow_id, job_id) ⇒ Object
- #next_free_id ⇒ Object
- #persist_job(workflow_id, job) ⇒ Object
- #persist_workflow(workflow) ⇒ Object
- #start_workflow(workflow, job_names = []) ⇒ Object
- #stop_workflow(id) ⇒ Object
- #worker_report(message) ⇒ Object
- #workflow_report(message) ⇒ Object
Constructor Details
#initialize(config = Gush.configuration) ⇒ Client
Returns a new instance of Client.
5 6 7 8 9 |
# File 'lib/gush/client.rb', line 5 def initialize(config = Gush.configuration) @configuration = config @sidekiq = build_sidekiq @redis = build_redis end |
Instance Attribute Details
#configuration ⇒ Object (readonly)
Returns the value of attribute configuration.
3 4 5 |
# File 'lib/gush/client.rb', line 3 def configuration @configuration end |
Instance Method Details
#all_workflows ⇒ Object
59 60 61 62 63 64 |
# File 'lib/gush/client.rb', line 59 def all_workflows redis.keys("gush.workflows.*").map do |key| id = key.sub("gush.workflows.", "") find_workflow(id) end end |
#configure {|configuration| ... } ⇒ Object
11 12 13 14 15 |
# File 'lib/gush/client.rb', line 11 def configure yield configuration @sidekiq = build_sidekiq @redis = build_redis end |
#create_workflow(name) ⇒ Object
17 18 19 20 21 22 23 24 25 26 |
# File 'lib/gush/client.rb', line 17 def create_workflow(name) begin flow = name.constantize.new flow.save rescue NameError raise WorkflowNotFound.new("Workflow with given name doesn't exist") end flow end |
#destroy_job(workflow_id, job) ⇒ Object
102 103 104 |
# File 'lib/gush/client.rb', line 102 def destroy_job(workflow_id, job) redis.del("gush.jobs.#{workflow_id}.#{job.class.to_s}") end |
#destroy_workflow(workflow) ⇒ Object
97 98 99 100 |
# File 'lib/gush/client.rb', line 97 def destroy_workflow(workflow) redis.del("gush.workflows.#{workflow.id}") workflow.jobs.each {|job| destroy_job(workflow.id, job) } end |
#enqueue_job(workflow_id, job) ⇒ Object
114 115 116 117 118 119 120 121 122 123 |
# File 'lib/gush/client.rb', line 114 def enqueue_job(workflow_id, job) job.enqueue! persist_job(workflow_id, job) sidekiq.push( 'class' => Gush::Worker, 'queue' => configuration.namespace, 'args' => [workflow_id, job.class.to_s] ) end |
#find_workflow(id) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/gush/client.rb', line 66 def find_workflow(id) data = redis.get("gush.workflows.#{id}") unless data.nil? hash = Gush::JSON.decode(data, symbolize_keys: true) keys = redis.keys("gush.jobs.#{id}.*") nodes = redis.mget(*keys).map { |json| Gush::JSON.decode(json, symbolize_keys: true) } workflow_from_hash(hash, nodes) else raise WorkflowNotFound.new("Workflow with given id doesn't exist") end end |
#load_job(workflow_id, job_id) ⇒ Object
89 90 91 92 93 94 95 |
# File 'lib/gush/client.rb', line 89 def load_job(workflow_id, job_id) workflow = find_workflow(workflow_id) data = redis.get("gush.jobs.#{workflow_id}.#{job_id}") return nil if data.nil? data = Gush::JSON.decode(data, symbolize_keys: true) Gush::Job.from_hash(workflow, data) end |
#next_free_id ⇒ Object
49 50 51 52 53 54 55 56 57 |
# File 'lib/gush/client.rb', line 49 def next_free_id id = nil loop do id = SecureRandom.uuid break if !redis.exists("gush.workflow.#{id}") end id end |
#persist_job(workflow_id, job) ⇒ Object
85 86 87 |
# File 'lib/gush/client.rb', line 85 def persist_job(workflow_id, job) redis.set("gush.jobs.#{workflow_id}.#{job.class.to_s}", job.to_json) end |
#persist_workflow(workflow) ⇒ Object
78 79 80 81 82 83 |
# File 'lib/gush/client.rb', line 78 def persist_workflow(workflow) redis.set("gush.workflows.#{workflow.id}", workflow.to_json) workflow.jobs.each {|job| persist_job(workflow.id, job) } workflow.mark_as_persisted true end |
#start_workflow(workflow, job_names = []) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/gush/client.rb', line 28 def start_workflow(workflow, job_names = []) workflow.mark_as_started persist_workflow(workflow) jobs = if job_names.empty? workflow.initial_jobs else job_names.map {|name| workflow.find_job(name) } end jobs.each do |job| enqueue_job(workflow.id, job) end end |
#stop_workflow(id) ⇒ Object
43 44 45 46 47 |
# File 'lib/gush/client.rb', line 43 def stop_workflow(id) workflow = find_workflow(id) workflow.mark_as_stopped persist_workflow(workflow) end |
#worker_report(message) ⇒ Object
106 107 108 |
# File 'lib/gush/client.rb', line 106 def worker_report() report("gush.workers.status", ) end |
#workflow_report(message) ⇒ Object
110 111 112 |
# File 'lib/gush/client.rb', line 110 def workflow_report() report("gush.workflows.status", ) end |