Class: RuleQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/cirrocumulus/rule_queue.rb

Defined Under Namespace

Classes: QueueEntry

Instance Method Summary collapse

Constructor Details

#initialize(ontology_instance) ⇒ RuleQueue

Returns a new instance of RuleQueue.



20
21
22
23
24
# File 'lib/cirrocumulus/rule_queue.rb', line 20

def initialize(ontology_instance)
  @mutex = Mutex.new
  @queue = []
  @ontology_instance = ontology_instance
end

Instance Method Details

#popObject



42
43
44
45
46
# File 'lib/cirrocumulus/rule_queue.rb', line 42

def pop
  @mutex.synchronize do
    @queue.empty? ? nil : @queue.shift
  end
end

#push(entry) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/cirrocumulus/rule_queue.rb', line 26

def push(entry)
  @mutex.synchronize do
    return if @queue.find {|e| e != :marker && e.state != :finished && e.run_data == entry}

    if entry.rule.options.has_key?(:for)
      delay = entry.rule.options[:for]
      run_at = Time.now + delay
      debug("Enqueued rule #{entry.rule.name}#{entry.parameters.inspect}")
      debug("And will execute this rule after #{delay} sec (at #{run_at.strftime("%H:%M:%S %d.%m.%Y")})")
      @queue.push(QueueEntry.new(entry, run_at))
    else
      @queue.push(QueueEntry.new(entry))
    end
  end
end

#run_queued_rulesObject



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/cirrocumulus/rule_queue.rb', line 54

def run_queued_rules
  @queue.push :marker

  while (entry = pop) != :marker do
    next if entry.state == :finished

    if !entry.run_at.nil? && (entry.run_at > Time.now)
      @queue.push(entry)
      next
    end

    if entry.run_data.matched_facts.all? {|fact| !fact.is_deleted}
      begin
        debug "Executing #{entry.rule.name}#{entry.params.inspect}"
        entry.rule.code.call(@ontology_instance, entry.params)
        entry.state
      rescue Exception => e
        warn "Exception while executing rule: %s\n%s" % [e.to_s, e.backtrace.to_s]
      end
    end
  end
end

#sizeObject



48
49
50
51
52
# File 'lib/cirrocumulus/rule_queue.rb', line 48

def size
  @mutex.synchronize do
    @queue.size
  end
end