Class: Qyu::Store::Memory::Adapter
- Defined in:
- lib/qyu/store/memory/adapter.rb
Overview
Qyu::Store::Memory::Adapter
Constant Summary collapse
- TYPE =
:memory
Class Method Summary collapse
Instance Method Summary collapse
- #clear_completed_jobs ⇒ Object
- #count_jobs ⇒ Object
- #delete_job(id) ⇒ Object
- #delete_workflow(id) ⇒ Object
- #delete_workflow_by_name(name) ⇒ Object
- #find_job(id) ⇒ Object
- #find_or_persist_task(name, queue_name, payload, job_id, parent_task_id) {|id| ... } ⇒ Object
-
#find_task(id) ⇒ Object
Task methods.
- #find_task_ids_by_job_id_and_name(job_id, name) ⇒ Object
- #find_task_ids_by_job_id_name_and_parent_task_ids(job_id, name, parent_task_ids) ⇒ Object
- #find_workflow(id) ⇒ Object
- #find_workflow_by_name(name) ⇒ Object
-
#initialize(_config) ⇒ Adapter
constructor
A new instance of Adapter.
- #lock_task!(id, lease_time) ⇒ Object
- #persist_job(workflow, payload) ⇒ Object
- #persist_workflow(name, descriptor) ⇒ Object
- #renew_lock_lease(id, lease_time, lease_token) ⇒ Object
- #select_jobs(limit, offset, order = :asc) ⇒ Object
- #select_tasks_by_job_id(job_id) ⇒ Object
- #task_status_counts(job_id) ⇒ Object
- #transaction ⇒ Object
- #unlock_task!(id, lease_token) ⇒ Object
- #update_status(id, status) ⇒ Object
Constructor Details
#initialize(_config) ⇒ Adapter
Returns a new instance of Adapter.
10 11 12 13 14 15 16 |
# File 'lib/qyu/store/memory/adapter.rb', line 10 def initialize(_config) @workflows = {} @jobs = {} @tasks = {} @locks = {} @semaphore = Mutex.new end |
Class Method Details
.valid_config?(_config) ⇒ Boolean
18 19 20 21 |
# File 'lib/qyu/store/memory/adapter.rb', line 18 def self.valid_config?(_config) # TODO true end |
Instance Method Details
#clear_completed_jobs ⇒ Object
77 78 79 |
# File 'lib/qyu/store/memory/adapter.rb', line 77 def clear_completed_jobs # TODO end |
#count_jobs ⇒ Object
81 82 83 |
# File 'lib/qyu/store/memory/adapter.rb', line 81 def count_jobs @jobs.count end |
#delete_job(id) ⇒ Object
73 74 75 |
# File 'lib/qyu/store/memory/adapter.rb', line 73 def delete_job(id) @jobs.delete(id) end |
#delete_workflow(id) ⇒ Object
43 44 45 |
# File 'lib/qyu/store/memory/adapter.rb', line 43 def delete_workflow(id) @workflows.delete(id) end |
#delete_workflow_by_name(name) ⇒ Object
47 48 49 50 51 |
# File 'lib/qyu/store/memory/adapter.rb', line 47 def delete_workflow_by_name(name) workflow = find_workflow_by_name(name) return unless workflow delete_workflow(workflow['id']) end |
#find_job(id) ⇒ Object
53 54 55 |
# File 'lib/qyu/store/memory/adapter.rb', line 53 def find_job(id) @jobs[id] end |
#find_or_persist_task(name, queue_name, payload, job_id, parent_task_id) {|id| ... } ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/qyu/store/memory/adapter.rb', line 90 def find_or_persist_task(name, queue_name, payload, job_id, parent_task_id) matching_task = @tasks.detect do |_id, attrs| attrs['job_id'] == job_id \ && attrs['name'] == name \ && attrs['payload'] == payload \ && attrs['queue_name'] == queue_name \ && attrs['parent_task_id'] == parent_task_id end return matching_task[0] if matching_task id = Qyu::Utils.uuid @tasks[id] = { 'name' => name, 'queue_name' => queue_name, 'parent_task_id' => parent_task_id, 'status' => Qyu::Status::QUEUED, 'payload' => payload, 'job_id' => job_id } yield(id) id end |
#find_task(id) ⇒ Object
Task methods
86 87 88 |
# File 'lib/qyu/store/memory/adapter.rb', line 86 def find_task(id) @tasks[id] end |
#find_task_ids_by_job_id_and_name(job_id, name) ⇒ Object
113 114 115 116 117 |
# File 'lib/qyu/store/memory/adapter.rb', line 113 def find_task_ids_by_job_id_and_name(job_id, name) @tasks.select do |_id, attrs| attrs['job_id'] == job_id && attrs['name'] == name end.map { |(id, _attr)| id } end |
#find_task_ids_by_job_id_name_and_parent_task_ids(job_id, name, parent_task_ids) ⇒ Object
119 120 121 122 123 124 125 |
# File 'lib/qyu/store/memory/adapter.rb', line 119 def find_task_ids_by_job_id_name_and_parent_task_ids(job_id, name, parent_task_ids) @tasks.select do |_id, attrs| attrs['job_id'] == job_id && attrs['name'] == name && parent_task_ids.include?(attrs['parent_task_id']) end.map { |(id, _attr)| id } end |
#find_workflow(id) ⇒ Object
23 24 25 |
# File 'lib/qyu/store/memory/adapter.rb', line 23 def find_workflow(id) @workflows[id] end |
#find_workflow_by_name(name) ⇒ Object
27 28 29 30 31 |
# File 'lib/qyu/store/memory/adapter.rb', line 27 def find_workflow_by_name(name) @workflows.detect do |_id, wflow| wflow['name'] == name end.last end |
#lock_task!(id, lease_time) ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/qyu/store/memory/adapter.rb', line 136 def lock_task!(id, lease_time) uuid = Qyu::Utils.uuid locked = false locked_until = nil @semaphore.synchronize do if @locks[id].nil? || @locks[id][:locked_until] < Time.now locked_until = Qyu::Utils.seconds_after_time(lease_time) @locks[id] = { locked_by: uuid, locked_until: locked_until } locked = true end end return [nil, nil] unless locked [uuid, locked_until] end |
#persist_job(workflow, payload) ⇒ Object
64 65 66 67 68 69 70 71 |
# File 'lib/qyu/store/memory/adapter.rb', line 64 def persist_job(workflow, payload) id = Qyu::Utils.uuid @jobs[id] = { 'payload' => payload, 'workflow' => workflow } id end |
#persist_workflow(name, descriptor) ⇒ Object
33 34 35 36 37 38 39 40 41 |
# File 'lib/qyu/store/memory/adapter.rb', line 33 def persist_workflow(name, descriptor) id = Qyu::Utils.uuid @workflows[id] = { 'id' => id, 'name' => name, 'descriptor' => descriptor } id end |
#renew_lock_lease(id, lease_time, lease_token) ⇒ Object
169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/qyu/store/memory/adapter.rb', line 169 def renew_lock_lease(id, lease_time, lease_token) locked_until = nil @semaphore.synchronize do if @locks[id][:locked_by] == lease_token && Time.now <= @locks[id][:locked_until] locked_until = Qyu::Utils.seconds_after_time(lease_time) @locks[id] = { locked_by: lease_token, locked_until: locked_until } end end locked_until end |
#select_jobs(limit, offset, order = :asc) ⇒ Object
57 58 59 60 61 62 |
# File 'lib/qyu/store/memory/adapter.rb', line 57 def select_jobs(limit, offset, order = :asc) ids = @jobs.keys[offset, limit] selected = ids.map { |id| { id: id }.merge(@jobs[id]) } return selected if order == :asc selected.reverse end |
#select_tasks_by_job_id(job_id) ⇒ Object
127 128 129 |
# File 'lib/qyu/store/memory/adapter.rb', line 127 def select_tasks_by_job_id(job_id) @tasks.select { |_id, attrs| attrs['job_id'] == job_id }.map { |id, attrs| attrs.merge('id' => id) } end |
#task_status_counts(job_id) ⇒ Object
131 132 133 134 |
# File 'lib/qyu/store/memory/adapter.rb', line 131 def task_status_counts(job_id) # TODO {} end |
#transaction ⇒ Object
181 182 183 184 |
# File 'lib/qyu/store/memory/adapter.rb', line 181 def transaction # TODO yield end |
#unlock_task!(id, lease_token) ⇒ Object
153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/qyu/store/memory/adapter.rb', line 153 def unlock_task!(id, lease_token) unlocked = false @semaphore.synchronize do if @locks[id][:locked_by] == lease_token @locks.delete(id) unlocked = true end end unlocked end |
#update_status(id, status) ⇒ Object
165 166 167 |
# File 'lib/qyu/store/memory/adapter.rb', line 165 def update_status(id, status) @tasks[id]['status'] = status end |