Class: Cellect::Server::Workflow

Inherits:
Object
  • Object
show all
Includes:
Celluloid
Defined in:
lib/cellect/server/workflow.rb

Direct Known Subclasses

GroupedWorkflow

Constant Summary collapse

SKIP_RELOAD_STATES =
[ :reloading, :loading, :initializing ].freeze
RELOAD_TIMEOUT =
ENV.fetch('RELOAD_TIMEOUT', 600).to_i.freeze
SET_KLASS =

Provide a lookup for matching sets to workflow criteria

{
  # priority, pairwise
  [    false, false  ] => DiffSet::RandomSet,
  [    false, true   ] => DiffSet::PairwiseRandomSet,
  [     true, false  ] => DiffSet::PrioritySet,
  [     true, true   ] => DiffSet::PairwisePrioritySet
}

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, pairwise: false, prioritized: false) ⇒ Workflow

Sets up a new workflow and starts the data loading



44
45
46
47
48
49
50
51
# File 'lib/cellect/server/workflow.rb', line 44

def initialize(name, pairwise: false, prioritized: false)
  self.name = name
  self.users = { }
  self.pairwise = !!pairwise
  self.prioritized = !!prioritized
  self.subjects ||= set_klass.new
  self.state = :initializing
end

Class Attribute Details

.workflow_namesObject

Returns the value of attribute workflow_names.



7
8
9
# File 'lib/cellect/server/workflow.rb', line 7

def workflow_names
  @workflow_names
end

Instance Attribute Details

#can_reload_atObject

Returns the value of attribute can_reload_at.



11
12
13
# File 'lib/cellect/server/workflow.rb', line 11

def can_reload_at
  @can_reload_at
end

#nameObject

Returns the value of attribute name.



11
12
13
# File 'lib/cellect/server/workflow.rb', line 11

def name
  @name
end

#pairwiseObject

Returns the value of attribute pairwise.



11
12
13
# File 'lib/cellect/server/workflow.rb', line 11

def pairwise
  @pairwise
end

#prioritizedObject

Returns the value of attribute prioritized.



11
12
13
# File 'lib/cellect/server/workflow.rb', line 11

def prioritized
  @prioritized
end

#stateObject

Returns the value of attribute state.



11
12
13
# File 'lib/cellect/server/workflow.rb', line 11

def state
  @state
end

#subjectsObject

Returns the value of attribute subjects.



11
12
13
# File 'lib/cellect/server/workflow.rb', line 11

def subjects
  @subjects
end

#usersObject

Returns the value of attribute users.



11
12
13
# File 'lib/cellect/server/workflow.rb', line 11

def users
  @users
end

Class Method Details

.[](name) ⇒ Object

Look up and/or load a workflow



18
19
20
21
22
23
# File 'lib/cellect/server/workflow.rb', line 18

def self.[](name)
  Cellect::Server.adapter.load_workflows(name) unless Actor[name]
  if registered_workflow = Actor[name]
    registered_workflow.actors.first
  end
end

.[]=(name, opts) ⇒ Object

Load a workflow



26
27
28
29
30
# File 'lib/cellect/server/workflow.rb', line 26

def self.[]=(name, opts)
  Actor[name] = supervise name, pairwise: opts['pairwise'], prioritized: opts['prioritized']
  Workflow.workflow_names[name] = true if Actor[name]
  Actor[name]
end

.allObject

All currently loaded workflows



39
40
41
# File 'lib/cellect/server/workflow.rb', line 39

def self.all
  names.collect{ |name| Workflow[name] }.compact
end

.namesObject

The names of all workflows currently loaded



33
34
35
36
# File 'lib/cellect/server/workflow.rb', line 33

def self.names
  actor_names = Celluloid.actor_system.registry.names.collect &:to_s
  actor_names.select{ |key| workflow_names[key] }
end

Instance Method Details

#add(opts = { }) ⇒ Object

Adds or updates a subject

Accepts a hash in the form:

subject_id: 1,
priority: 0.5  # (if the workflow is prioritized)



117
118
119
120
121
122
123
# File 'lib/cellect/server/workflow.rb', line 117

def add(opts = { })
  if prioritized?
    subjects.add opts[:subject_id], opts[:priority]
  else
    subjects.add opts[:subject_id]
  end
end

#add_seen_for(user_id, *subject_ids) ⇒ Object

Add subjects to a users seen set



83
84
85
86
87
# File 'lib/cellect/server/workflow.rb', line 83

def add_seen_for(user_id, *subject_ids)
  [subject_ids].flatten.compact.each do |subject_id|
    user(user_id).seen.add subject_id
  end
end

#can_reload_data?Boolean

Returns:

  • (Boolean)


178
179
180
181
182
183
184
185
186
# File 'lib/cellect/server/workflow.rb', line 178

def can_reload_data?
  if SKIP_RELOAD_STATES.include?(self.state)
    false
  elsif can_reload_at.nil?
    true
  else
    can_reload_at <= Time.now
  end
end

#data_loaderObject



192
193
194
# File 'lib/cellect/server/workflow.rb', line 192

def data_loader
  Loader.new(self)
end

#grouped?Boolean

Returns:

  • (Boolean)


143
144
145
# File 'lib/cellect/server/workflow.rb', line 143

def grouped?
  false
end

#load_dataObject

Loads subjects from the adapter



54
55
56
57
58
# File 'lib/cellect/server/workflow.rb', line 54

def load_data
  return if [:loading, :ready ].include? state
  self.state = :loading
  data_loader.async.load_data
end

#pairwise?Boolean

Returns:

  • (Boolean)


135
136
137
# File 'lib/cellect/server/workflow.rb', line 135

def pairwise?
  !!pairwise
end

#prioritized?Boolean

Returns:

  • (Boolean)


139
140
141
# File 'lib/cellect/server/workflow.rb', line 139

def prioritized?
  !!prioritized
end

#ready?Boolean

Returns:

  • (Boolean)


147
148
149
# File 'lib/cellect/server/workflow.rb', line 147

def ready?
  state == :ready
end

#reload_dataObject

Reloads subjects from the adapter



61
62
63
64
65
66
67
# File 'lib/cellect/server/workflow.rb', line 61

def reload_data
  if can_reload_data?
    self.state = :reloading
    reload_set = subjects.class.new
    data_loader.async.reload_data(reload_set)
  end
end

#remove(opts = { }) ⇒ Object

Removes a subject

Accepts a hash in the form:

subject_id: 1



131
132
133
# File 'lib/cellect/server/workflow.rb', line 131

def remove(opts = { })
  subjects.remove opts[:subject_id]
end

#remove_user(user_id) ⇒ Object

Unload a user



90
91
92
93
# File 'lib/cellect/server/workflow.rb', line 90

def remove_user(user_id)
  removed = self.users.delete user_id
  removed.terminate if removed
end

#sample(opts = { }) ⇒ Object

Get a sample of subjects for a user

Accepts a hash in the form:

{
  user_id: 123,
  limit: 5
}


102
103
104
105
106
107
108
# File 'lib/cellect/server/workflow.rb', line 102

def sample(opts = { })
  if opts[:user_id]
    unseen_for opts[:user_id], limit: opts[:limit]
  else
    subjects.sample opts[:limit]
  end
end

#set_klassObject

Looks up the set class



161
162
163
# File 'lib/cellect/server/workflow.rb', line 161

def set_klass
  @set_klass ||= SET_KLASS[[prioritized, pairwise]]
end

#set_reload_at_time(time_stamp = Time.now + RELOAD_TIMEOUT) ⇒ Object



188
189
190
# File 'lib/cellect/server/workflow.rb', line 188

def set_reload_at_time(time_stamp=Time.now + RELOAD_TIMEOUT)
  self.can_reload_at = time_stamp
end

#statusObject

General information about this workflow



166
167
168
169
170
171
172
173
174
175
176
# File 'lib/cellect/server/workflow.rb', line 166

def status
  {
    name: name,
    state: state,
    grouped: false,
    prioritized: prioritized,
    pairwise: pairwise,
    subjects: subjects.size,
    users: users.length
  }
end

#unseen_for(user_id, limit: 5) ⇒ Object

Get unseen subjects for a user



78
79
80
# File 'lib/cellect/server/workflow.rb', line 78

def unseen_for(user_id, limit: 5)
  subjects.subtract user(user_id).seen, limit
end

#user(id) ⇒ Object

Look up and/or load a user



70
71
72
73
74
75
# File 'lib/cellect/server/workflow.rb', line 70

def user(id)
  self.users[id] ||= User.supervise id, workflow_name: name
  user = self.users[id].actors.first
  user.load_data
  user
end