Class: Qyu::Store::Memory::Adapter

Inherits:
Base
  • Object
show all
Defined in:
lib/qyu/store/memory/adapter.rb

Overview

Qyu::Store::Memory::Adapter

Constant Summary collapse

TYPE =
:memory

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(_config) ⇒ Adapter


10
11
12
13
14
15
16
# File 'lib/qyu/store/memory/adapter.rb', line 10

def initialize(_config)
  @workflows = {}
  @jobs = {}
  @tasks = {}
  @locks = {}
  @semaphore = Mutex.new
end

Class Method Details

.valid_config?(_config) ⇒ Boolean


18
19
20
21
# File 'lib/qyu/store/memory/adapter.rb', line 18

def self.valid_config?(_config)
  # TODO
  true
end

Instance Method Details

#clear_completed_jobsObject


77
78
79
# File 'lib/qyu/store/memory/adapter.rb', line 77

def clear_completed_jobs
  # TODO
end

#count_jobsObject


81
82
83
# File 'lib/qyu/store/memory/adapter.rb', line 81

def count_jobs
  @jobs.count
end

#delete_job(id) ⇒ Object


73
74
75
# File 'lib/qyu/store/memory/adapter.rb', line 73

def delete_job(id)
  @jobs.delete(id)
end

#delete_workflow(id) ⇒ Object


43
44
45
# File 'lib/qyu/store/memory/adapter.rb', line 43

def delete_workflow(id)
  @workflows.delete(id)
end

#delete_workflow_by_name(name) ⇒ Object


47
48
49
50
51
# File 'lib/qyu/store/memory/adapter.rb', line 47

def delete_workflow_by_name(name)
  workflow = find_workflow_by_name(name)
  return unless workflow
  delete_workflow(workflow['id'])
end

#find_job(id) ⇒ Object


53
54
55
# File 'lib/qyu/store/memory/adapter.rb', line 53

def find_job(id)
  @jobs[id]
end

#find_or_persist_task(name, queue_name, payload, job_id, parent_task_id) {|id| ... } ⇒ Object

Yields:

  • (id)

90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/qyu/store/memory/adapter.rb', line 90

def find_or_persist_task(name, queue_name, payload, job_id, parent_task_id)
  matching_task = @tasks.detect do |_id, attrs|
    attrs['job_id'] == job_id \
    && attrs['name'] == name \
    && attrs['payload'] == payload \
    && attrs['queue_name'] == queue_name \
    && attrs['parent_task_id'] == parent_task_id
  end
  return matching_task[0] if matching_task

  id = Qyu::Utils.uuid
  @tasks[id] = {
    'name' => name,
    'queue_name' => queue_name,
    'parent_task_id' => parent_task_id,
    'status' => Qyu::Status::QUEUED,
    'payload' => payload,
    'job_id' => job_id
  }
  yield(id)
  id
end

#find_task(id) ⇒ Object

Task methods


86
87
88
# File 'lib/qyu/store/memory/adapter.rb', line 86

def find_task(id)
  @tasks[id]
end

#find_task_ids_by_job_id_and_name(job_id, name) ⇒ Object


113
114
115
116
117
# File 'lib/qyu/store/memory/adapter.rb', line 113

def find_task_ids_by_job_id_and_name(job_id, name)
  @tasks.select do |_id, attrs|
    attrs['job_id'] == job_id && attrs['name'] == name
  end.map { |(id, _attr)| id }
end

#find_task_ids_by_job_id_name_and_parent_task_ids(job_id, name, parent_task_ids) ⇒ Object


119
120
121
122
123
124
125
# File 'lib/qyu/store/memory/adapter.rb', line 119

def find_task_ids_by_job_id_name_and_parent_task_ids(job_id, name, parent_task_ids)
  @tasks.select do |_id, attrs|
    attrs['job_id'] == job_id &&
    attrs['name'] == name &&
    parent_task_ids.include?(attrs['parent_task_id'])
  end.map { |(id, _attr)| id }
end

#find_workflow(id) ⇒ Object


23
24
25
# File 'lib/qyu/store/memory/adapter.rb', line 23

def find_workflow(id)
  @workflows[id]
end

#find_workflow_by_name(name) ⇒ Object


27
28
29
30
31
# File 'lib/qyu/store/memory/adapter.rb', line 27

def find_workflow_by_name(name)
  @workflows.detect do |_id, wflow|
    wflow['name'] == name
  end.last
end

#lock_task!(id, lease_time) ⇒ Object


136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/qyu/store/memory/adapter.rb', line 136

def lock_task!(id, lease_time)
  uuid = Qyu::Utils.uuid
  locked = false
  locked_until = nil
  @semaphore.synchronize do
    if @locks[id].nil? || @locks[id][:locked_until] < Time.now
      locked_until = Qyu::Utils.seconds_after_time(lease_time)
      @locks[id] = { locked_by: uuid, locked_until: locked_until }
      locked = true
    end
  end

  return [nil, nil] unless locked

  [uuid, locked_until]
end

#persist_job(workflow, payload) ⇒ Object


64
65
66
67
68
69
70
71
# File 'lib/qyu/store/memory/adapter.rb', line 64

def persist_job(workflow, payload)
  id = Qyu::Utils.uuid
  @jobs[id] = {
    'payload'  => payload,
    'workflow' => workflow
  }
  id
end

#persist_workflow(name, descriptor) ⇒ Object


33
34
35
36
37
38
39
40
41
# File 'lib/qyu/store/memory/adapter.rb', line 33

def persist_workflow(name, descriptor)
  id = Qyu::Utils.uuid
  @workflows[id] = {
    'id'         => id,
    'name'       => name,
    'descriptor' => descriptor
  }
  id
end

#renew_lock_lease(id, lease_time, lease_token) ⇒ Object


169
170
171
172
173
174
175
176
177
178
179
# File 'lib/qyu/store/memory/adapter.rb', line 169

def renew_lock_lease(id, lease_time, lease_token)
  locked_until = nil
  @semaphore.synchronize do
    if @locks[id][:locked_by] == lease_token && Time.now <= @locks[id][:locked_until]
      locked_until = Qyu::Utils.seconds_after_time(lease_time)
      @locks[id] = { locked_by: lease_token, locked_until: locked_until }
    end
  end

  locked_until
end

#select_jobs(limit, offset, order = :asc) ⇒ Object


57
58
59
60
61
62
# File 'lib/qyu/store/memory/adapter.rb', line 57

def select_jobs(limit, offset, order = :asc)
  ids = @jobs.keys[offset, limit]
  selected = ids.map { |id| { id: id }.merge(@jobs[id]) }
  return selected if order == :asc
  selected.reverse
end

#select_tasks_by_job_id(job_id) ⇒ Object


127
128
129
# File 'lib/qyu/store/memory/adapter.rb', line 127

def select_tasks_by_job_id(job_id)
  @tasks.select { |_id, attrs| attrs['job_id'] == job_id }.map { |id, attrs| attrs.merge('id' => id) }
end

#task_status_counts(job_id) ⇒ Object


131
132
133
134
# File 'lib/qyu/store/memory/adapter.rb', line 131

def task_status_counts(job_id)
  # TODO
  {}
end

#transactionObject


181
182
183
184
# File 'lib/qyu/store/memory/adapter.rb', line 181

def transaction
  # TODO
  yield
end

#unlock_task!(id, lease_token) ⇒ Object


153
154
155
156
157
158
159
160
161
162
163
# File 'lib/qyu/store/memory/adapter.rb', line 153

def unlock_task!(id, lease_token)
  unlocked = false
  @semaphore.synchronize do
    if @locks[id][:locked_by] == lease_token
      @locks.delete(id)
      unlocked = true
    end
  end

  unlocked
end

#update_status(id, status) ⇒ Object


165
166
167
# File 'lib/qyu/store/memory/adapter.rb', line 165

def update_status(id, status)
  @tasks[id]['status'] = status
end