Class: Gush::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/gush/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = Gush.configuration) ⇒ Client

Returns a new instance of Client.



7
8
9
# File 'lib/gush/client.rb', line 7

def initialize(config = Gush.configuration)
  @configuration = config
end

Instance Attribute Details

#configurationObject (readonly)

Returns the value of attribute configuration.



5
6
7
# File 'lib/gush/client.rb', line 5

def configuration
  @configuration
end

Instance Method Details

#all_workflowsObject



74
75
76
77
78
79
80
81
# File 'lib/gush/client.rb', line 74

def all_workflows
  connection_pool.with do |redis|
    redis.scan_each(match: "gush.workflows.*").map do |key|
      id = key.sub("gush.workflows.", "")
      find_workflow(id)
    end
  end
end

#configure {|configuration| ... } ⇒ Object

Yields:



11
12
13
# File 'lib/gush/client.rb', line 11

def configure
  yield configuration
end

#create_workflow(name) ⇒ Object



15
16
17
18
19
20
21
22
# File 'lib/gush/client.rb', line 15

def create_workflow(name)
  begin
    name.constantize.create
  rescue NameError
    raise WorkflowNotFound.new("Workflow with given name doesn't exist")
  end
  flow
end

#destroy_job(workflow_id, job) ⇒ Object



141
142
143
144
145
# File 'lib/gush/client.rb', line 141

def destroy_job(workflow_id, job)
  connection_pool.with do |redis|
    redis.del("gush.jobs.#{workflow_id}.#{job.klass}")
  end
end

#destroy_workflow(workflow) ⇒ Object



134
135
136
137
138
139
# File 'lib/gush/client.rb', line 134

def destroy_workflow(workflow)
  connection_pool.with do |redis|
    redis.del("gush.workflows.#{workflow.id}")
  end
  workflow.jobs.each {|job| destroy_job(workflow.id, job) }
end

#enqueue_job(workflow_id, job) ⇒ Object



162
163
164
165
166
167
168
# File 'lib/gush/client.rb', line 162

def enqueue_job(workflow_id, job)
  job.enqueue!
  persist_job(workflow_id, job)
  queue = job.queue || configuration.namespace

  Gush::Worker.set(queue: queue).perform_later(*[workflow_id, job.name])
end

#expire_job(workflow_id, job, ttl = nil) ⇒ Object



155
156
157
158
159
160
# File 'lib/gush/client.rb', line 155

def expire_job(workflow_id, job, ttl=nil)
  ttl = ttl || configuration.ttl
  connection_pool.with do |redis|
    redis.expire("gush.jobs.#{workflow_id}.#{job.name}", ttl)
  end
end

#expire_workflow(workflow, ttl = nil) ⇒ Object



147
148
149
150
151
152
153
# File 'lib/gush/client.rb', line 147

def expire_workflow(workflow, ttl=nil)
  ttl = ttl || configuration.ttl
  connection_pool.with do |redis|
    redis.expire("gush.workflows.#{workflow.id}", ttl)
  end
  workflow.jobs.each {|job| expire_job(workflow.id, job, ttl) }
end

#find_job(workflow_id, job_name) ⇒ Object



119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/gush/client.rb', line 119

def find_job(workflow_id, job_name)
  job_name_match = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(job_name)

  data = if job_name_match
           find_job_by_klass_and_id(workflow_id, job_name)
         else
           find_job_by_klass(workflow_id, job_name)
         end

  return nil if data.nil?

  data = Gush::JSON.decode(data, symbolize_keys: true)
  Gush::Job.from_hash(data)
end

#find_workflow(id) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/gush/client.rb', line 83

def find_workflow(id)
  connection_pool.with do |redis|
    data = redis.get("gush.workflows.#{id}")

    unless data.nil?
      hash = Gush::JSON.decode(data, symbolize_keys: true)
      keys = redis.scan_each(match: "gush.jobs.#{id}.*")

      nodes = keys.each_with_object([]) do |key, array|
        array.concat redis.hvals(key).map { |json| Gush::JSON.decode(json, symbolize_keys: true) }
      end

      workflow_from_hash(hash, nodes)
    else
      raise WorkflowNotFound.new("Workflow with given id doesn't exist")
    end
  end
end

#next_free_job_id(workflow_id, job_klass) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/gush/client.rb', line 45

def next_free_job_id(workflow_id, job_klass)
  job_id = nil

  loop do
    job_id = SecureRandom.uuid
    available = connection_pool.with do |redis|
      !redis.hexists("gush.jobs.#{workflow_id}.#{job_klass}", job_id)
    end

    break if available
  end

  job_id
end

#next_free_workflow_idObject



60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/gush/client.rb', line 60

def next_free_workflow_id
  id = nil
  loop do
    id = SecureRandom.uuid
    available = connection_pool.with do |redis|
      !redis.exists("gush.workflow.#{id}")
    end

    break if available
  end

  id
end

#persist_job(workflow_id, job) ⇒ Object



113
114
115
116
117
# File 'lib/gush/client.rb', line 113

def persist_job(workflow_id, job)
  connection_pool.with do |redis|
    redis.hset("gush.jobs.#{workflow_id}.#{job.klass}", job.id, job.to_json)
  end
end

#persist_workflow(workflow) ⇒ Object



102
103
104
105
106
107
108
109
110
111
# File 'lib/gush/client.rb', line 102

def persist_workflow(workflow)
  connection_pool.with do |redis|
    redis.set("gush.workflows.#{workflow.id}", workflow.to_json)
  end

  workflow.jobs.each {|job| persist_job(workflow.id, job) }
  workflow.mark_as_persisted

  true
end

#start_workflow(workflow, job_names = []) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/gush/client.rb', line 24

def start_workflow(workflow, job_names = [])
  workflow.mark_as_started
  persist_workflow(workflow)

  jobs = if job_names.empty?
           workflow.initial_jobs
         else
           job_names.map {|name| workflow.find_job(name) }
         end

  jobs.each do |job|
    enqueue_job(workflow.id, job)
  end
end

#stop_workflow(id) ⇒ Object



39
40
41
42
43
# File 'lib/gush/client.rb', line 39

def stop_workflow(id)
  workflow = find_workflow(id)
  workflow.mark_as_stopped
  persist_workflow(workflow)
end