Class: Cellect::Server::Workflow
- Inherits:
-
Object
- Object
- Cellect::Server::Workflow
- Includes:
- Celluloid
- Defined in:
- lib/cellect/server/workflow.rb
Direct Known Subclasses
Constant Summary collapse
- SKIP_RELOAD_STATES =
[ :reloading, :loading, :initializing ].freeze
- RELOAD_TIMEOUT =
ENV.fetch('RELOAD_TIMEOUT', 600).to_i.freeze
- SET_KLASS =
Provide a lookup for matching sets to workflow criteria
{ # priority, pairwise [ false, false ] => DiffSet::RandomSet, [ false, true ] => DiffSet::PairwiseRandomSet, [ true, false ] => DiffSet::PrioritySet, [ true, true ] => DiffSet::PairwisePrioritySet }
Class Attribute Summary collapse
-
.workflow_names ⇒ Object
Returns the value of attribute workflow_names.
Instance Attribute Summary collapse
-
#can_reload_at ⇒ Object
Returns the value of attribute can_reload_at.
-
#name ⇒ Object
Returns the value of attribute name.
-
#pairwise ⇒ Object
Returns the value of attribute pairwise.
-
#prioritized ⇒ Object
Returns the value of attribute prioritized.
-
#state ⇒ Object
Returns the value of attribute state.
-
#subjects ⇒ Object
Returns the value of attribute subjects.
-
#users ⇒ Object
Returns the value of attribute users.
Class Method Summary collapse
-
.[](name) ⇒ Object
Look up and/or load a workflow.
-
.[]=(name, opts) ⇒ Object
Load a workflow.
-
.all ⇒ Object
All currently loaded workflows.
-
.names ⇒ Object
The names of all workflows currently loaded.
Instance Method Summary collapse
-
#add(opts = { }) ⇒ Object
Adds or updates a subject.
-
#add_seen_for(user_id, *subject_ids) ⇒ Object
Add subjects to a users seen set.
- #can_reload_data? ⇒ Boolean
- #data_loader ⇒ Object
- #grouped? ⇒ Boolean
-
#initialize(name, pairwise: false, prioritized: false) ⇒ Workflow
constructor
Sets up a new workflow and starts the data loading.
-
#load_data ⇒ Object
Loads subjects from the adapter.
- #pairwise? ⇒ Boolean
- #prioritized? ⇒ Boolean
- #ready? ⇒ Boolean
-
#reload_data ⇒ Object
Reloads subjects from the adapter.
-
#remove(opts = { }) ⇒ Object
Removes a subject.
-
#remove_user(user_id) ⇒ Object
Unload a user.
-
#sample(opts = { }) ⇒ Object
Get a sample of subjects for a user.
-
#set_klass ⇒ Object
Looks up the set class.
- #set_reload_at_time(time_stamp = Time.now + RELOAD_TIMEOUT) ⇒ Object
-
#status ⇒ Object
General information about this workflow.
-
#unseen_for(user_id, limit: 5) ⇒ Object
Get unseen subjects for a user.
-
#user(id) ⇒ Object
Look up and/or load a user.
Constructor Details
#initialize(name, pairwise: false, prioritized: false) ⇒ Workflow
Sets up a new workflow and starts the data loading
44 45 46 47 48 49 50 51 |
# File 'lib/cellect/server/workflow.rb', line 44 def initialize(name, pairwise: false, prioritized: false) self.name = name self.users = { } self.pairwise = !!pairwise self.prioritized = !!prioritized self.subjects ||= set_klass.new self.state = :initializing end |
Class Attribute Details
.workflow_names ⇒ Object
Returns the value of attribute workflow_names.
7 8 9 |
# File 'lib/cellect/server/workflow.rb', line 7 def workflow_names @workflow_names end |
Instance Attribute Details
#can_reload_at ⇒ Object
Returns the value of attribute can_reload_at.
11 12 13 |
# File 'lib/cellect/server/workflow.rb', line 11 def can_reload_at @can_reload_at end |
#name ⇒ Object
Returns the value of attribute name.
11 12 13 |
# File 'lib/cellect/server/workflow.rb', line 11 def name @name end |
#pairwise ⇒ Object
Returns the value of attribute pairwise.
11 12 13 |
# File 'lib/cellect/server/workflow.rb', line 11 def pairwise @pairwise end |
#prioritized ⇒ Object
Returns the value of attribute prioritized.
11 12 13 |
# File 'lib/cellect/server/workflow.rb', line 11 def prioritized @prioritized end |
#state ⇒ Object
Returns the value of attribute state.
11 12 13 |
# File 'lib/cellect/server/workflow.rb', line 11 def state @state end |
#subjects ⇒ Object
Returns the value of attribute subjects.
11 12 13 |
# File 'lib/cellect/server/workflow.rb', line 11 def subjects @subjects end |
#users ⇒ Object
Returns the value of attribute users.
11 12 13 |
# File 'lib/cellect/server/workflow.rb', line 11 def users @users end |
Class Method Details
.[](name) ⇒ Object
Look up and/or load a workflow
18 19 20 21 22 23 |
# File 'lib/cellect/server/workflow.rb', line 18 def self.[](name) Cellect::Server.adapter.load_workflows(name) unless Actor[name] if registered_workflow = Actor[name] registered_workflow.actors.first end end |
.[]=(name, opts) ⇒ Object
Load a workflow
26 27 28 29 30 |
# File 'lib/cellect/server/workflow.rb', line 26 def self.[]=(name, opts) Actor[name] = supervise name, pairwise: opts['pairwise'], prioritized: opts['prioritized'] Workflow.workflow_names[name] = true if Actor[name] Actor[name] end |
.all ⇒ Object
All currently loaded workflows
39 40 41 |
# File 'lib/cellect/server/workflow.rb', line 39 def self.all names.collect{ |name| Workflow[name] }.compact end |
.names ⇒ Object
The names of all workflows currently loaded
33 34 35 36 |
# File 'lib/cellect/server/workflow.rb', line 33 def self.names actor_names = Celluloid.actor_system.registry.names.collect &:to_s actor_names.select{ |key| workflow_names[key] } end |
Instance Method Details
#add(opts = { }) ⇒ Object
Adds or updates a subject
Accepts a hash in the form:
subject_id: 1,
priority: 0.5 # (if the workflow is prioritized)
117 118 119 120 121 122 123 |
# File 'lib/cellect/server/workflow.rb', line 117 def add(opts = { }) if prioritized? subjects.add opts[:subject_id], opts[:priority] else subjects.add opts[:subject_id] end end |
#add_seen_for(user_id, *subject_ids) ⇒ Object
Add subjects to a users seen set
83 84 85 86 87 |
# File 'lib/cellect/server/workflow.rb', line 83 def add_seen_for(user_id, *subject_ids) [subject_ids].flatten.compact.each do |subject_id| user(user_id).seen.add subject_id end end |
#can_reload_data? ⇒ Boolean
178 179 180 181 182 183 184 185 186 |
# File 'lib/cellect/server/workflow.rb', line 178 def can_reload_data? if SKIP_RELOAD_STATES.include?(self.state) false elsif can_reload_at.nil? true else can_reload_at <= Time.now end end |
#data_loader ⇒ Object
192 193 194 |
# File 'lib/cellect/server/workflow.rb', line 192 def data_loader Loader.new(self) end |
#grouped? ⇒ Boolean
143 144 145 |
# File 'lib/cellect/server/workflow.rb', line 143 def grouped? false end |
#load_data ⇒ Object
Loads subjects from the adapter
54 55 56 57 58 |
# File 'lib/cellect/server/workflow.rb', line 54 def load_data return if [:loading, :ready ].include? state self.state = :loading data_loader.async.load_data end |
#pairwise? ⇒ Boolean
135 136 137 |
# File 'lib/cellect/server/workflow.rb', line 135 def pairwise? !!pairwise end |
#prioritized? ⇒ Boolean
139 140 141 |
# File 'lib/cellect/server/workflow.rb', line 139 def prioritized? !!prioritized end |
#ready? ⇒ Boolean
147 148 149 |
# File 'lib/cellect/server/workflow.rb', line 147 def ready? state == :ready end |
#reload_data ⇒ Object
Reloads subjects from the adapter
61 62 63 64 65 66 67 |
# File 'lib/cellect/server/workflow.rb', line 61 def reload_data if can_reload_data? self.state = :reloading reload_set = subjects.class.new data_loader.async.reload_data(reload_set) end end |
#remove(opts = { }) ⇒ Object
Removes a subject
Accepts a hash in the form:
subject_id: 1
131 132 133 |
# File 'lib/cellect/server/workflow.rb', line 131 def remove(opts = { }) subjects.remove opts[:subject_id] end |
#remove_user(user_id) ⇒ Object
Unload a user
90 91 92 93 |
# File 'lib/cellect/server/workflow.rb', line 90 def remove_user(user_id) removed = self.users.delete user_id removed.terminate if removed end |
#sample(opts = { }) ⇒ Object
Get a sample of subjects for a user
Accepts a hash in the form:
{
user_id: 123,
limit: 5
}
102 103 104 105 106 107 108 |
# File 'lib/cellect/server/workflow.rb', line 102 def sample(opts = { }) if opts[:user_id] unseen_for opts[:user_id], limit: opts[:limit] else subjects.sample opts[:limit] end end |
#set_klass ⇒ Object
Looks up the set class
161 162 163 |
# File 'lib/cellect/server/workflow.rb', line 161 def set_klass @set_klass ||= SET_KLASS[[prioritized, pairwise]] end |
#set_reload_at_time(time_stamp = Time.now + RELOAD_TIMEOUT) ⇒ Object
188 189 190 |
# File 'lib/cellect/server/workflow.rb', line 188 def set_reload_at_time(time_stamp=Time.now + RELOAD_TIMEOUT) self.can_reload_at = time_stamp end |
#status ⇒ Object
General information about this workflow
166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/cellect/server/workflow.rb', line 166 def status { name: name, state: state, grouped: false, prioritized: prioritized, pairwise: pairwise, subjects: subjects.size, users: users.length } end |
#unseen_for(user_id, limit: 5) ⇒ Object
Get unseen subjects for a user
78 79 80 |
# File 'lib/cellect/server/workflow.rb', line 78 def unseen_for(user_id, limit: 5) subjects.subtract user(user_id).seen, limit end |
#user(id) ⇒ Object
Look up and/or load a user
70 71 72 73 74 75 |
# File 'lib/cellect/server/workflow.rb', line 70 def user(id) self.users[id] ||= User.supervise id, workflow_name: name user = self.users[id].actors.first user.load_data user end |