Class: Pione::RuleEngine::RuleApplication

Inherits:
SimpleDelegator
  • Object
show all
Defined in:
lib/pione/rule-engine/flow-handler.rb

Instance Method Summary collapse

Constructor Details

#initialize(handler) ⇒ RuleApplication

Returns a new instance of RuleApplication.



175
176
177
178
179
# File 'lib/pione/rule-engine/flow-handler.rb', line 175

def initialize(handler)
  super(handler)
  @data_finder = DataFinder.new(tuple_space_server, domain_id)
  @finished = Set.new # finished tuple cache
end

Instance Method Details

#apply(rules) ⇒ Object

Apply input data to rules.



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/pione/rule-engine/flow-handler.rb', line 182

def apply(rules)
  # start message
  user_message_begin("Rule Application: %s" % digest, 1)

  # with profile
  Util::Profiler.profile(Util::RuleApplicationProfileReport.new(digest)) do
    # rule application loop
    while tasks = find_tasks(rules) do
      # save previous finished tuples's number
      size = @finished.size

      # distribute tasks
      distribute_tasks(tasks)

      # check whether tasks have been processed
      break unless size < @finished.size
    end
  end

  # end message
  user_message_end("Rule Application: %s" % digest, 1)
end

#check_updatability(task) ⇒ Object

Check updatability of the task and get update order.



321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
# File 'lib/pione/rule-engine/flow-handler.rb', line 321

def check_updatability(task)
  # read all tuples of data-null
  data_null_tuples = read_all(TupleSpace::DataNullTuple.new(domain: task.domain_id))

  res = []

  f = lambda do |task_env, outputs|
    # make parameter set for the task
    table = Hash.new

    if val_i = task_env.variable_get!(Lang::Variable.new("O"))
      table["OUTPUT"] = Lang::Variable.new("O")
      table["O"] = val_i
    end

    task_param_set = task.param_set.set(table: task.param_set.table.merge(table))

    # check update criterias
    order = UpdateCriteria.order(task_env, task.rule_condition, task.inputs, outputs, data_null_tuples)
    res << [order, task_env, task_param_set]
  end

  # find output data combination
  @data_finder.find(:output, task.rule_condition.outputs, task.env, &f)
  f.call(task.env, []) if res.empty?

  # evaluate the result
  groups = res.group_by {|(order, _, _)| order}
  if f = groups[:force] or f = groups[:weak]
    order, env, param_set = f.first

    # setup output variables
    var_o = Lang::Variable.new("O")
    task.env.variable_set(Lang::Variable.new("OUTPUT"), var_o)
    kseq = find_output_variables(task, Lang::KeyedSequence.new)
    task.env.variable_set(var_o, kseq)
    param_set = param_set.set(table: param_set.table.merge({"O" => kseq}))

    return task.set(order: order, env: env, param_set: param_set)
  else
    return nil
  end
end

#distribute_tasks(tasks) ⇒ Object

Distribute tasks.



379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
# File 'lib/pione/rule-engine/flow-handler.rb', line 379

def distribute_tasks(tasks)
  # log and message
  process_log(make_task_process_record.merge(transition: "suspend"))
  process_log(make_rule_process_record.merge(transition: "suspend"))
  user_message_begin("Distribution: %s" % digest, 2)

  # distribute tasks
  tasks.each do |task|
    tuple = task.make_tuple(domain_id)

    # publish tasks
    if need_to_publish_task?(task, tuple)
      # clear finished tuple and data tuples from the domain
      take!(TupleSpace::FinishedTuple.new(domain: task.domain_id))
      take_all!(TupleSpace::DataTuple.new(domain: task.domain_id))

      # copy input data from this domain to the task domain
      task.inputs.flatten.each {|input| copy_data_into_domain(input, task.domain_id)}

      # write the task
      write(tuple)

      # log and message
      process_log(task.make_task_process_record.merge(transition: "schedule"))
      user_message(">>> %s".color(:yellow) % task.digest, 3, "", :blue)
    else
      # cancel the task
      Log::Debug.rule_engine "task %s canceled at %s" % [task.digest, digest]
    end
  end

  # wait an end of distributed tasks
  wait_task_completion(tasks)

  # turn foreground if the task is background
  unless read!(TupleSpace::ForegroundTuple.new(domain_id, digest))
    write(TupleSpace::ForegroundTuple.new(domain_id, digest))
  end

  # log and message
  process_log(make_rule_process_record.merge(transition: "resume"))
  process_log(make_task_process_record.merge(transition: "resume"))
  user_message_end("Distribution: %s" % digest, 2)
end

#find_applicable_rules(rules) ⇒ Object

Find applicable rules. The criterion of applicable rule is that the rule satisfies ticket conditions or not.



219
220
221
222
223
224
225
226
# File 'lib/pione/rule-engine/flow-handler.rb', line 219

def find_applicable_rules(rules)
  # select rules which ticktes exist in this domain
  rules.select do |rule|
    rule.input_tickets.pieces.all? do |ticket|
      read!(TupleSpace::TicketTuple.new(domain_id, ticket.name))
    end
  end
end

#find_output_variables(task, kseq) ⇒ Object



365
366
367
368
369
370
371
372
373
374
375
376
# File 'lib/pione/rule-engine/flow-handler.rb', line 365

def find_output_variables(task, kseq)
  _kseq = kseq
  task.rule_condition.outputs.each_with_index do |condition, i|
    begin
      data = condition.eval(task.env)
      _kseq = _kseq.put(Lang::IntegerSequence.of(i+1), data)
    rescue Lang::UnboundError
      next
    end
  end
  return _kseq
end

#find_tasks(rules) ⇒ Object

Find applicable and updatable rule applications.



206
207
208
209
210
211
212
213
214
215
# File 'lib/pione/rule-engine/flow-handler.rb', line 206

def find_tasks(rules)
  # select applicable rules
  applicable_rules = find_applicable_rules(rules)

  # make task
  tasks = make_tasks(applicable_rules)

  # be careful that returns nil when tasks are empty
  tasks.empty? ? nil : tasks
end

#find_tasks_by_rule_condition(env, rule, rule_definition, rule_condition, param_set) ⇒ Object

Handle parameter distribution. Rule parameters with each modifier are distributed tasks by each element.



278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/pione/rule-engine/flow-handler.rb', line 278

def find_tasks_by_rule_condition(env, rule, rule_definition, rule_condition, param_set)
  tasks = []

  # find input data combinations
  @data_finder.find(:input, rule_condition.inputs, env) do |task_env, inputs|
    # make parameter set for the task
    table = Hash.new

    if val_i = task_env.variable_get!(Lang::Variable.new("I"))
      table["INPUT"] = Lang::Variable.new(name: "I", package_id: rule.package_id)
      table["I"] = val_i
    end

    if val_star = task_env.variable_get!(Lang::Variable.new("*"))
      table["*"] = val_star
    end

    task_param_set = param_set.set(table: param_set.table.merge(table))

    # check constraint conditions
    next unless rule_condition.constraints.all? do |constraint|
      res = constraint.eval(task_env)
      if res.is_a?(Lang::BooleanSequence)
        res.value
      else
        raise Lang::StructuralError.new(Lang::BooleanSequence, constraint.pos)
      end
    end

    # make task
    domain_id = Util::DomainID.generate(rule.package_id, rule.name, inputs, task_param_set)
    task = Task.new(task_env, domain_id, rule, rule_definition, rule_condition, inputs, task_param_set)

    # check updatability
    if _task = check_updatability(task)
      tasks << _task
    end
  end

  return tasks
end

#import_outputs_of_task(task, finished) ⇒ Object

Import finished tuple's outputs from the domain.



475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
# File 'lib/pione/rule-engine/flow-handler.rb', line 475

def import_outputs_of_task(task, finished)
  finished.outputs.each_with_index do |output, i|
    data_expr = task.rule_condition.outputs[i].eval(task.env)
    output = [output] unless output.kind_of?(Array)

    case data_expr.operation
    when :write
      output.each {|o| copy_data_into_domain(o, domain_id)}
    when :remove
      output.each {|o| remove_data_from_domain(o, domain_id)}
    when :touch
      output.each {|o| touch_data_in_domain(o, domain_id)}
    else
      # here is unreached
      raise RuleExecutionError.new(self)
    end
  end
end

#lift_touch_tuple(task) ⇒ Object

Lift effects of touch operations from the task domain to this domain.



495
496
497
498
499
500
501
502
503
504
505
# File 'lib/pione/rule-engine/flow-handler.rb', line 495

def lift_touch_tuple(task)
  read_all(TupleSpace::TouchTuple.new(domain: task.domain_id)).each do |touch|
    if target = read!(TupleSpace::DataTuple.new(name: touch.name, domain: domain_id))
      # update time of data tuple
      write(target.tap {|x| x.time = touch.time}) unless target.time > touch.time

      # lift touch tuple to upper domain
      write(touch.tap{|x| x.domain = domain_id})
    end
  end
end

#make_tasks(rules) ⇒ Object

Make tasks from rules.



229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/pione/rule-engine/flow-handler.rb', line 229

def make_tasks(rules)
  rules.each_with_object([]) do |rule, tasks|
    # set handler's package id if rule's package id is implicit
    rule = rule.set(package_id: package_id) unless rule.package_id

    # get rule definition
    rule_definition = env.rule_get(rule)

    # handle parameter sequence
    pieces = rule.param_sets.pieces
    if not(pieces.empty?)
      pieces.each do |param_set|
        ### merge default parameter values ####
        # setup task's environment by parameter set
        _env = plain_env.layer.merge_param_set(param_set)
        _env.set(current_package_id: rule.package_id || env.current_package_id)

        # get task's condition
        rule_condition = rule_definition.rule_condition_context.eval(_env)

        # merge default values
        _param_set = param_set.merge_default_values(rule_condition)

        # handle parameter distribution
        _param_set.eval(_env).expand do |expanded_param_set|
          # rebuild environment by expanded param set
          _env = plain_env.layer.merge_param_set(expanded_param_set)
          _env.set(current_package_id: rule.package_id || env.current_package_id)

          # get task's condition
          rule_condition = rule_definition.rule_condition_context.eval(_env)

          tasks.concat find_tasks_by_rule_condition(_env, rule, rule_definition, rule_condition, expanded_param_set).uniq
        end
      end
    else
      _env = plain_env.layer
      # get task's condition
      rule_condition = rule_definition.rule_condition_context.eval(_env)
      this_tasks = find_tasks_by_rule_condition(
        _env, rule, rule_definition, rule_condition, Lang::ParameterSet.new
      ).uniq
      tasks.concat(this_tasks)
    end
  end
end

#need_to_publish_task?(task, tuple) ⇒ Boolean

Return true if we need to publish the task.

Returns:

  • (Boolean)


425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
# File 'lib/pione/rule-engine/flow-handler.rb', line 425

def need_to_publish_task?(task, tuple)
  # reuse task finished result if order is weak update
  if task.order == :weak
    template = TupleSpace::FinishedTuple.new(domain: task.domain_id, status: :succeeded)
    if @finished.include?(template)
      return false
    end
    if finished = read!(template)
      unless @finished.any? {|t| t.uuid == finished.uuid}
        @finished << finished
      end
      return false
    end
  end

  # the task exists in space already, so we don't need to publish
  return false if read!(tuple)

  # another worker is working now, so we don't need to publish
  return false if read!(TupleSpace::WorkingTuple.new(domain: task.domain_id))

  # we need to publish the task
  return true
end

#wait_task_completion(tasks) ⇒ Object

Wait until tasks completed.



451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
# File 'lib/pione/rule-engine/flow-handler.rb', line 451

def wait_task_completion(tasks)
  tasks.each do |task|
    # wait to finish the distributed task, note that finished tuple is in
    # the task domain
    finished = read(TupleSpace::FinishedTuple.new(domain: task.domain_id))
    unless @finished.any? {|t| t.uuid == finished.uuid}
      @finished << finished
    end

    ### task completion processing ###
    # copy write operation data tuple from the task domain to this domain
    import_outputs_of_task(task, finished)

    # touch tuple
    lift_touch_tuple(task)

    # publish output tickets
    task.rule.output_tickets.pieces.each do |piece|
      write(TupleSpace::TicketTuple.new(domain_id, piece.name))
    end
  end
end