Class: Qyu::Store::Memory::Adapter

Inherits:
Base
  • Object
show all
Defined in:
lib/qyu/store/memory/adapter.rb

Constant Summary collapse

TYPE =
:memory

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(_config) ⇒ Adapter

Returns a new instance of Adapter.



9
10
11
12
13
14
15
# File 'lib/qyu/store/memory/adapter.rb', line 9

def initialize(_config)
  @workflows = {}
  @jobs = {}
  @tasks = {}
  @locks = {}
  @semaphore = Mutex.new
end

Class Method Details

.valid_config?(_config) ⇒ Boolean

Returns:

  • (Boolean)


17
18
19
20
# File 'lib/qyu/store/memory/adapter.rb', line 17

def self.valid_config?(_config)
  # TODO
  true
end

Instance Method Details

#clear_completed_jobsObject



76
77
78
# File 'lib/qyu/store/memory/adapter.rb', line 76

def clear_completed_jobs
  # TODO
end

#count_jobsObject



80
81
82
# File 'lib/qyu/store/memory/adapter.rb', line 80

def count_jobs
  @jobs.count
end

#delete_job(id) ⇒ Object



72
73
74
# File 'lib/qyu/store/memory/adapter.rb', line 72

def delete_job(id)
  @jobs.delete(id)
end

#delete_workflow(id) ⇒ Object



42
43
44
# File 'lib/qyu/store/memory/adapter.rb', line 42

def delete_workflow(id)
  @workflows.delete(id)
end

#delete_workflow_by_name(name) ⇒ Object



46
47
48
49
50
# File 'lib/qyu/store/memory/adapter.rb', line 46

def delete_workflow_by_name(name)
  workflow = find_workflow_by_name(name)
  return unless workflow
  delete_workflow(workflow['id'])
end

#find_job(id) ⇒ Object



52
53
54
# File 'lib/qyu/store/memory/adapter.rb', line 52

def find_job(id)
  @jobs[id]
end

#find_or_persist_task(name, queue_name, payload, job_id, parent_task_id) {|id| ... } ⇒ Object

Yields:

  • (id)


89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/qyu/store/memory/adapter.rb', line 89

def find_or_persist_task(name, queue_name, payload, job_id, parent_task_id)
  matching_task = @tasks.detect do |_id, attrs|
    attrs['job_id'] == job_id \
    && attrs['name'] == name \
    && attrs['payload'] == payload \
    && attrs['queue_name'] == queue_name \
    && attrs['parent_task_id'] == parent_task_id
  end
  return matching_task[0] if matching_task

  id = Qyu::Utils.uuid
  @tasks[id] = {
    'name' => name,
    'queue_name' => queue_name,
    'parent_task_id' => parent_task_id,
    'status' => Qyu::Status::QUEUED,
    'payload' => payload,
    'job_id' => job_id
  }
  yield(id)
  id
end

#find_task(id) ⇒ Object

Task methods



85
86
87
# File 'lib/qyu/store/memory/adapter.rb', line 85

def find_task(id)
  @tasks[id]
end

#find_task_ids_by_job_id_and_name(job_id, name) ⇒ Object



112
113
114
115
116
# File 'lib/qyu/store/memory/adapter.rb', line 112

def find_task_ids_by_job_id_and_name(job_id, name)
  @tasks.select do |_id, attrs|
    attrs['job_id'] == job_id && attrs['name'] == name
  end.map { |(id, _attr)| id }
end

#find_task_ids_by_job_id_name_and_parent_task_ids(job_id, name, parent_task_ids) ⇒ Object



118
119
120
121
122
123
124
# File 'lib/qyu/store/memory/adapter.rb', line 118

def find_task_ids_by_job_id_name_and_parent_task_ids(job_id, name, parent_task_ids)
  @tasks.select do |_id, attrs|
    attrs['job_id'] == job_id &&
    attrs['name'] == name &&
    parent_task_ids.include?(attrs['parent_task_id'])
  end.map { |(id, _attr)| id }
end

#find_workflow(id) ⇒ Object



22
23
24
# File 'lib/qyu/store/memory/adapter.rb', line 22

def find_workflow(id)
  @workflows[id]
end

#find_workflow_by_name(name) ⇒ Object



26
27
28
29
30
# File 'lib/qyu/store/memory/adapter.rb', line 26

def find_workflow_by_name(name)
  @workflows.detect do |_id, wflow|
    wflow['name'] == name
  end.last
end

#lock_task!(id, lease_time) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/qyu/store/memory/adapter.rb', line 135

def lock_task!(id, lease_time)
  uuid = Qyu::Utils.uuid
  locked = false
  locked_until = nil
  @semaphore.synchronize do
    if @locks[id].nil? || @locks[id][:locked_until] < Time.now
      locked_until = Qyu::Utils.seconds_after_time(lease_time)
      @locks[id] = { locked_by: uuid, locked_until: locked_until }
      locked = true
    end
  end

  return [nil, nil] unless locked

  [uuid, locked_until]
end

#persist_job(workflow, payload) ⇒ Object



63
64
65
66
67
68
69
70
# File 'lib/qyu/store/memory/adapter.rb', line 63

def persist_job(workflow, payload)
  id = Qyu::Utils.uuid
  @jobs[id] = {
    'payload'  => payload,
    'workflow' => workflow
  }
  id
end

#persist_workflow(name, descriptor) ⇒ Object



32
33
34
35
36
37
38
39
40
# File 'lib/qyu/store/memory/adapter.rb', line 32

def persist_workflow(name, descriptor)
  id = Qyu::Utils.uuid
  @workflows[id] = {
    'id'         => id,
    'name'       => name,
    'descriptor' => descriptor
  }
  id
end

#renew_lock_lease(id, lease_time, lease_token) ⇒ Object



168
169
170
171
172
173
174
175
176
177
178
# File 'lib/qyu/store/memory/adapter.rb', line 168

def renew_lock_lease(id, lease_time, lease_token)
  locked_until = nil
  @semaphore.synchronize do
    if @locks[id][:locked_by] == lease_token && Time.now <= @locks[id][:locked_until]
      locked_until = Qyu::Utils.seconds_after_time(lease_time)
      @locks[id] = { locked_by: lease_token, locked_until: locked_until }
    end
  end

  locked_until
end

#select_jobs(limit, offset, order = :asc) ⇒ Object



56
57
58
59
60
61
# File 'lib/qyu/store/memory/adapter.rb', line 56

def select_jobs(limit, offset, order = :asc)
  ids = @jobs.keys[offset, limit]
  selected = ids.map { |id| { id: id }.merge(@jobs[id]) }
  return selected if order == :asc
  selected.reverse
end

#select_tasks_by_job_id(job_id) ⇒ Object



126
127
128
# File 'lib/qyu/store/memory/adapter.rb', line 126

def select_tasks_by_job_id(job_id)
  @tasks.select { |_id, attrs| attrs['job_id'] == job_id }.map { |id, attrs| attrs.merge('id' => id) }
end

#task_status_counts(job_id) ⇒ Object



130
131
132
133
# File 'lib/qyu/store/memory/adapter.rb', line 130

def task_status_counts(job_id)
  # TODO
  {}
end

#transactionObject



180
181
182
183
# File 'lib/qyu/store/memory/adapter.rb', line 180

def transaction
  # TODO
  yield
end

#unlock_task!(id, lease_token) ⇒ Object



152
153
154
155
156
157
158
159
160
161
162
# File 'lib/qyu/store/memory/adapter.rb', line 152

def unlock_task!(id, lease_token)
  unlocked = false
  @semaphore.synchronize do
    if @locks[id][:locked_by] == lease_token
      @locks.delete(id)
      unlocked = true
    end
  end

  unlocked
end

#update_status(id, status) ⇒ Object



164
165
166
# File 'lib/qyu/store/memory/adapter.rb', line 164

def update_status(id, status)
  @tasks[id]['status'] = status
end