Class: Qyu::Store::Memory::Adapter
- Defined in:
- lib/qyu/store/memory/adapter.rb
Constant Summary collapse
- TYPE =
:memory
Class Method Summary collapse
Instance Method Summary collapse
- #clear_completed_jobs ⇒ Object
- #count_jobs ⇒ Object
- #delete_job(id) ⇒ Object
- #delete_workflow(id) ⇒ Object
- #delete_workflow_by_name(name) ⇒ Object
- #find_job(id) ⇒ Object
- #find_or_persist_task(name, queue_name, payload, job_id, parent_task_id) {|id| ... } ⇒ Object
-
#find_task(id) ⇒ Object
Task methods.
- #find_task_ids_by_job_id_and_name(job_id, name) ⇒ Object
- #find_task_ids_by_job_id_name_and_parent_task_ids(job_id, name, parent_task_ids) ⇒ Object
- #find_workflow(id) ⇒ Object
- #find_workflow_by_name(name) ⇒ Object
-
#initialize(_config) ⇒ Adapter
constructor
A new instance of Adapter.
- #lock_task!(id, lease_time) ⇒ Object
- #persist_job(workflow, payload) ⇒ Object
- #persist_workflow(name, descriptor) ⇒ Object
- #renew_lock_lease(id, lease_time, lease_token) ⇒ Object
- #select_jobs(limit, offset, order = :asc) ⇒ Object
- #select_tasks_by_job_id(job_id) ⇒ Object
- #task_status_counts(job_id) ⇒ Object
- #transaction ⇒ Object
- #unlock_task!(id, lease_token) ⇒ Object
- #update_status(id, status) ⇒ Object
Constructor Details
#initialize(_config) ⇒ Adapter
Returns a new instance of Adapter.
9 10 11 12 13 14 15 |
# File 'lib/qyu/store/memory/adapter.rb', line 9 def initialize(_config) @workflows = {} @jobs = {} @tasks = {} @locks = {} @semaphore = Mutex.new end |
Class Method Details
.valid_config?(_config) ⇒ Boolean
17 18 19 20 |
# File 'lib/qyu/store/memory/adapter.rb', line 17 def self.valid_config?(_config) # TODO true end |
Instance Method Details
#clear_completed_jobs ⇒ Object
76 77 78 |
# File 'lib/qyu/store/memory/adapter.rb', line 76 def clear_completed_jobs # TODO end |
#count_jobs ⇒ Object
80 81 82 |
# File 'lib/qyu/store/memory/adapter.rb', line 80 def count_jobs @jobs.count end |
#delete_job(id) ⇒ Object
72 73 74 |
# File 'lib/qyu/store/memory/adapter.rb', line 72 def delete_job(id) @jobs.delete(id) end |
#delete_workflow(id) ⇒ Object
42 43 44 |
# File 'lib/qyu/store/memory/adapter.rb', line 42 def delete_workflow(id) @workflows.delete(id) end |
#delete_workflow_by_name(name) ⇒ Object
46 47 48 49 50 |
# File 'lib/qyu/store/memory/adapter.rb', line 46 def delete_workflow_by_name(name) workflow = find_workflow_by_name(name) return unless workflow delete_workflow(workflow['id']) end |
#find_job(id) ⇒ Object
52 53 54 |
# File 'lib/qyu/store/memory/adapter.rb', line 52 def find_job(id) @jobs[id] end |
#find_or_persist_task(name, queue_name, payload, job_id, parent_task_id) {|id| ... } ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/qyu/store/memory/adapter.rb', line 89 def find_or_persist_task(name, queue_name, payload, job_id, parent_task_id) matching_task = @tasks.detect do |_id, attrs| attrs['job_id'] == job_id \ && attrs['name'] == name \ && attrs['payload'] == payload \ && attrs['queue_name'] == queue_name \ && attrs['parent_task_id'] == parent_task_id end return matching_task[0] if matching_task id = Qyu::Utils.uuid @tasks[id] = { 'name' => name, 'queue_name' => queue_name, 'parent_task_id' => parent_task_id, 'status' => Qyu::Status::QUEUED, 'payload' => payload, 'job_id' => job_id } yield(id) id end |
#find_task(id) ⇒ Object
Task methods
85 86 87 |
# File 'lib/qyu/store/memory/adapter.rb', line 85 def find_task(id) @tasks[id] end |
#find_task_ids_by_job_id_and_name(job_id, name) ⇒ Object
112 113 114 115 116 |
# File 'lib/qyu/store/memory/adapter.rb', line 112 def find_task_ids_by_job_id_and_name(job_id, name) @tasks.select do |_id, attrs| attrs['job_id'] == job_id && attrs['name'] == name end.map { |(id, _attr)| id } end |
#find_task_ids_by_job_id_name_and_parent_task_ids(job_id, name, parent_task_ids) ⇒ Object
118 119 120 121 122 123 124 |
# File 'lib/qyu/store/memory/adapter.rb', line 118 def find_task_ids_by_job_id_name_and_parent_task_ids(job_id, name, parent_task_ids) @tasks.select do |_id, attrs| attrs['job_id'] == job_id && attrs['name'] == name && parent_task_ids.include?(attrs['parent_task_id']) end.map { |(id, _attr)| id } end |
#find_workflow(id) ⇒ Object
22 23 24 |
# File 'lib/qyu/store/memory/adapter.rb', line 22 def find_workflow(id) @workflows[id] end |
#find_workflow_by_name(name) ⇒ Object
26 27 28 29 30 |
# File 'lib/qyu/store/memory/adapter.rb', line 26 def find_workflow_by_name(name) @workflows.detect do |_id, wflow| wflow['name'] == name end.last end |
#lock_task!(id, lease_time) ⇒ Object
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/qyu/store/memory/adapter.rb', line 135 def lock_task!(id, lease_time) uuid = Qyu::Utils.uuid locked = false locked_until = nil @semaphore.synchronize do if @locks[id].nil? || @locks[id][:locked_until] < Time.now locked_until = Qyu::Utils.seconds_after_time(lease_time) @locks[id] = { locked_by: uuid, locked_until: locked_until } locked = true end end return [nil, nil] unless locked [uuid, locked_until] end |
#persist_job(workflow, payload) ⇒ Object
63 64 65 66 67 68 69 70 |
# File 'lib/qyu/store/memory/adapter.rb', line 63 def persist_job(workflow, payload) id = Qyu::Utils.uuid @jobs[id] = { 'payload' => payload, 'workflow' => workflow } id end |
#persist_workflow(name, descriptor) ⇒ Object
32 33 34 35 36 37 38 39 40 |
# File 'lib/qyu/store/memory/adapter.rb', line 32 def persist_workflow(name, descriptor) id = Qyu::Utils.uuid @workflows[id] = { 'id' => id, 'name' => name, 'descriptor' => descriptor } id end |
#renew_lock_lease(id, lease_time, lease_token) ⇒ Object
168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/qyu/store/memory/adapter.rb', line 168 def renew_lock_lease(id, lease_time, lease_token) locked_until = nil @semaphore.synchronize do if @locks[id][:locked_by] == lease_token && Time.now <= @locks[id][:locked_until] locked_until = Qyu::Utils.seconds_after_time(lease_time) @locks[id] = { locked_by: lease_token, locked_until: locked_until } end end locked_until end |
#select_jobs(limit, offset, order = :asc) ⇒ Object
56 57 58 59 60 61 |
# File 'lib/qyu/store/memory/adapter.rb', line 56 def select_jobs(limit, offset, order = :asc) ids = @jobs.keys[offset, limit] selected = ids.map { |id| { id: id }.merge(@jobs[id]) } return selected if order == :asc selected.reverse end |
#select_tasks_by_job_id(job_id) ⇒ Object
126 127 128 |
# File 'lib/qyu/store/memory/adapter.rb', line 126 def select_tasks_by_job_id(job_id) @tasks.select { |_id, attrs| attrs['job_id'] == job_id }.map { |id, attrs| attrs.merge('id' => id) } end |
#task_status_counts(job_id) ⇒ Object
130 131 132 133 |
# File 'lib/qyu/store/memory/adapter.rb', line 130 def task_status_counts(job_id) # TODO {} end |
#transaction ⇒ Object
180 181 182 183 |
# File 'lib/qyu/store/memory/adapter.rb', line 180 def transaction # TODO yield end |
#unlock_task!(id, lease_token) ⇒ Object
152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/qyu/store/memory/adapter.rb', line 152 def unlock_task!(id, lease_token) unlocked = false @semaphore.synchronize do if @locks[id][:locked_by] == lease_token @locks.delete(id) unlocked = true end end unlocked end |
#update_status(id, status) ⇒ Object
164 165 166 |
# File 'lib/qyu/store/memory/adapter.rb', line 164 def update_status(id, status) @tasks[id]['status'] = status end |