Module: Taskinator::Definition

Defined in:
lib/taskinator/definition.rb

Defined Under Namespace

Classes: ProcessAlreadyDefinedError, ProcessUndefinedError

Constant Summary collapse

UndefinedProcessError =

for backward compatibility

ProcessUndefinedError

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#queueObject

Returns the value of attribute queue.



87
88
89
# File 'lib/taskinator/definition.rb', line 87

def queue
  @queue
end

Instance Method Details

#create_process(*args) ⇒ Object

creates an instance of the process NOTE: the supplied @args are serialized and ultimately passed to each method of the defined process



93
94
95
96
# File 'lib/taskinator/definition.rb', line 93

def create_process(*args)
  assert_valid_process_module
  _create_process_(false, *args)
end

#create_process_remotely(*args) ⇒ Object

returns the process uuid of the process to be created the process can be retrieved using this uuid by using Taskinator::Process.fetch(uuid)



103
104
105
106
107
108
109
110
# File 'lib/taskinator/definition.rb', line 103

def create_process_remotely(*args)
  assert_valid_process_module
  uuid = Taskinator.generate_uuid

  Taskinator.queue.enqueue_create_process(self, uuid, args)

  return uuid
end

#create_sub_process(*args) ⇒ Object



112
113
114
115
# File 'lib/taskinator/definition.rb', line 112

def create_sub_process(*args)
  assert_valid_process_module
  _create_process_(true, *args)
end

#define_concurrent_process(*arg_list, &block) ⇒ Object



22
23
24
25
26
27
28
# File 'lib/taskinator/definition.rb', line 22

def define_concurrent_process(*arg_list, &block)
  factory = lambda {|definition, options|
    complete_on = options.delete(:complete_on) || CompleteOn::Default
    Process.define_concurrent_process_for(definition, complete_on, options)
  }
  define_process(*arg_list + [factory], &block)
end

#define_process(*arg_list, &block) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/taskinator/definition.rb', line 30

def define_process(*arg_list, &block)
  raise ProcessAlreadyDefinedError if respond_to?(:_create_process_)

  factory = arg_list.last.respond_to?(:call) ?
              arg_list.pop :
              lambda {|definition, options|
                Process.define_sequential_process_for(definition, options)
              }

  # called from respective "create_process" methods
  # parameters can contain options as the last parameter
  define_singleton_method :_create_process_ do |subprocess, *args|
    begin

      # TODO: better validation of arguments

      # FIXME: arg_list should only contain an array of symbols

      raise ArgumentError, "wrong number of arguments (#{args.length} for #{arg_list.length})" if args.length < arg_list.length

      options = (args.last.is_a?(Hash) ? args.last : {})
      options[:scope] ||= :shared

      process = factory.call(self, options)

      # this may take long... up to users definition
      Taskinator.instrumenter.instrument('taskinator.process.created', :uuid => process.uuid, :state => :initial) do
        Builder.new(process, self, *args).instance_eval(&block)
      end

      # only save "root processes"
      unless subprocess

        # instrument separately
        Taskinator.instrumenter.instrument('taskinator.process.saved', :uuid => process.uuid, :state => :initial) do

          # this will visit "sub processes" and persist them too
          process.save

          # add it to the list of "root processes"
          Persistence.add_process_to_list(process)

        end

      end

      # this is the "root" process
      process

    rescue => e
      Taskinator.logger.error(e)
      Taskinator.logger.debug(e.backtrace)
      raise e
    end
  end
end

#define_sequential_process(*arg_list, &block) ⇒ Object

defines a process



15
16
17
18
19
20
# File 'lib/taskinator/definition.rb', line 15

def define_sequential_process(*arg_list, &block)
  factory = lambda {|definition, options|
    Process.define_sequential_process_for(definition, options)
  }
  define_process(*arg_list + [factory], &block)
end