Class: Gush::Client
- Inherits:
-
Object
- Object
- Gush::Client
- Defined in:
- lib/gush/client.rb
Constant Summary collapse
- @@redis_connection =
Concurrent::ThreadLocalVar.new(nil)
Instance Attribute Summary collapse
-
#configuration ⇒ Object
readonly
Returns the value of attribute configuration.
Class Method Summary collapse
Instance Method Summary collapse
- #all_workflows ⇒ Object
- #configure {|configuration| ... } ⇒ Object
- #create_workflow(name) ⇒ Object
- #destroy_job(workflow_id, job) ⇒ Object
- #destroy_workflow(workflow) ⇒ Object
- #enqueue_job(workflow_id, job) ⇒ Object
- #expire_job(workflow_id, job, ttl = nil) ⇒ Object
- #expire_workflow(workflow, ttl = nil) ⇒ Object
- #find_job(workflow_id, job_name) ⇒ Object
- #find_workflow(id) ⇒ Object
-
#initialize(config = Gush.configuration) ⇒ Client
constructor
A new instance of Client.
- #next_free_job_id(workflow_id, job_klass) ⇒ Object
- #next_free_workflow_id ⇒ Object
- #persist_job(workflow_id, job) ⇒ Object
- #persist_workflow(workflow) ⇒ Object
- #start_workflow(workflow, job_names = []) ⇒ Object
- #stop_workflow(id) ⇒ Object
Constructor Details
#initialize(config = Gush.configuration) ⇒ Client
Returns a new instance of Client.
20 21 22 |
# File 'lib/gush/client.rb', line 20 def initialize(config = Gush.configuration) @configuration = config end |
Instance Attribute Details
#configuration ⇒ Object (readonly)
Returns the value of attribute configuration.
6 7 8 |
# File 'lib/gush/client.rb', line 6 def configuration @configuration end |
Class Method Details
.redis_connection(config) ⇒ Object
10 11 12 13 14 15 16 17 18 |
# File 'lib/gush/client.rb', line 10 def self.redis_connection(config) cached = (@@redis_connection.value ||= { url: config.redis_url, connection: nil }) return cached[:connection] if !cached[:connection].nil? && config.redis_url == cached[:url] Redis.new(url: config.redis_url).tap do |instance| RedisClassy.redis = instance @@redis_connection.value = { url: config.redis_url, connection: instance } end end |
Instance Method Details
#all_workflows ⇒ Object
83 84 85 86 87 88 |
# File 'lib/gush/client.rb', line 83 def all_workflows redis.scan_each(match: "gush.workflows.*").map do |key| id = key.sub("gush.workflows.", "") find_workflow(id) end end |
#configure {|configuration| ... } ⇒ Object
24 25 26 |
# File 'lib/gush/client.rb', line 24 def configure yield configuration end |
#create_workflow(name) ⇒ Object
28 29 30 31 32 33 34 35 |
# File 'lib/gush/client.rb', line 28 def create_workflow(name) begin name.constantize.create rescue NameError raise WorkflowNotFound.new("Workflow with given name doesn't exist") end flow end |
#destroy_job(workflow_id, job) ⇒ Object
140 141 142 |
# File 'lib/gush/client.rb', line 140 def destroy_job(workflow_id, job) redis.del("gush.jobs.#{workflow_id}.#{job.klass}") end |
#destroy_workflow(workflow) ⇒ Object
135 136 137 138 |
# File 'lib/gush/client.rb', line 135 def destroy_workflow(workflow) redis.del("gush.workflows.#{workflow.id}") workflow.jobs.each {|job| destroy_job(workflow.id, job) } end |
#enqueue_job(workflow_id, job) ⇒ Object
155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/gush/client.rb', line 155 def enqueue_job(workflow_id, job) job.enqueue! persist_job(workflow_id, job) queue = job.queue || configuration.namespace wait = job.wait if wait.present? Gush::Worker.set(queue: queue, wait: wait).perform_later(*[workflow_id, job.name]) else Gush::Worker.set(queue: queue).perform_later(*[workflow_id, job.name]) end end |
#expire_job(workflow_id, job, ttl = nil) ⇒ Object
150 151 152 153 |
# File 'lib/gush/client.rb', line 150 def expire_job(workflow_id, job, ttl=nil) ttl = ttl || configuration.ttl redis.expire("gush.jobs.#{workflow_id}.#{job.klass}", ttl) end |
#expire_workflow(workflow, ttl = nil) ⇒ Object
144 145 146 147 148 |
# File 'lib/gush/client.rb', line 144 def expire_workflow(workflow, ttl=nil) ttl = ttl || configuration.ttl redis.expire("gush.workflows.#{workflow.id}", ttl) workflow.jobs.each {|job| expire_job(workflow.id, job, ttl) } end |
#find_job(workflow_id, job_name) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/gush/client.rb', line 120 def find_job(workflow_id, job_name) job_name_match = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(job_name) data = if job_name_match find_job_by_klass_and_id(workflow_id, job_name) else find_job_by_klass(workflow_id, job_name) end return nil if data.nil? data = Gush::JSON.decode(data, symbolize_keys: true) Gush::Job.from_hash(data) end |
#find_workflow(id) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/gush/client.rb', line 90 def find_workflow(id) data = redis.get("gush.workflows.#{id}") unless data.nil? hash = Gush::JSON.decode(data, symbolize_keys: true) keys = redis.scan_each(match: "gush.jobs.#{id}.*") nodes = keys.each_with_object([]) do |key, array| array.concat redis.hvals(key).map { |json| Gush::JSON.decode(json, symbolize_keys: true) } end workflow_from_hash(hash, nodes) else raise WorkflowNotFound.new("Workflow with given id doesn't exist") end end |
#next_free_job_id(workflow_id, job_klass) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/gush/client.rb', line 58 def next_free_job_id(workflow_id, job_klass) job_id = nil loop do job_id = SecureRandom.uuid available = !redis.hexists("gush.jobs.#{workflow_id}.#{job_klass}", job_id) break if available end job_id end |
#next_free_workflow_id ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/gush/client.rb', line 71 def next_free_workflow_id id = nil loop do id = SecureRandom.uuid available = !redis.exists?("gush.workflow.#{id}") break if available end id end |
#persist_job(workflow_id, job) ⇒ Object
116 117 118 |
# File 'lib/gush/client.rb', line 116 def persist_job(workflow_id, job) redis.hset("gush.jobs.#{workflow_id}.#{job.klass}", job.id, job.to_json) end |
#persist_workflow(workflow) ⇒ Object
107 108 109 110 111 112 113 114 |
# File 'lib/gush/client.rb', line 107 def persist_workflow(workflow) redis.set("gush.workflows.#{workflow.id}", workflow.to_json) workflow.jobs.each {|job| persist_job(workflow.id, job) } workflow.mark_as_persisted true end |
#start_workflow(workflow, job_names = []) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/gush/client.rb', line 37 def start_workflow(workflow, job_names = []) workflow.mark_as_started persist_workflow(workflow) jobs = if job_names.empty? workflow.initial_jobs else job_names.map {|name| workflow.find_job(name) } end jobs.each do |job| enqueue_job(workflow.id, job) end end |
#stop_workflow(id) ⇒ Object
52 53 54 55 56 |
# File 'lib/gush/client.rb', line 52 def stop_workflow(id) workflow = find_workflow(id) workflow.mark_as_stopped persist_workflow(workflow) end |