Class: Opm::OpmProcess

Inherits:
Object
  • Object
show all
Defined in:
lib/opm.rb

Overview

Main process class

class Sum < OpmProcess
  input :input1, :input2
  output :output1

  def run
    @output1 = @input1 + @input2
  end
end

proc = Sum.process :input1 => 1, :input2 => 12
proc.wait
puts proc.output1 # 13

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.input(*args) ⇒ Object

Define input variables. In the process if you use

include :foo, :bar

then you can use them as @foo, @bar



163
164
165
166
167
168
169
# File 'lib/opm.rb', line 163

def self.input(*args)
   @input_arguments ||= []
   @input_arguments += args
   args.each do |arg|
     instance_variable_set("@#{arg}", nil)
   end
end

.output(*args) ⇒ Object

Outputs of the process

class SampleProcess < OpmProcess
  input :input
  output :foo
  ...
end

bar = SampleProcess.process :input => "something"
puts bar.foo


182
183
184
185
186
187
188
189
# File 'lib/opm.rb', line 182

def self.output(*args)
  @output_arguments ||= []
  @output_arguments += args
  args.each do |arg|
    instance_variable_set("@#{arg}", nil)
    class_eval %(class << self; attr_reader :#{arg} end)
  end
end

.process(args = {}) ⇒ Object

Start the process



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/opm.rb', line 198

def self.process(args={})
   
  if @remote != nil
    #It's not okay yet
    object = DRb::DRbObject.new object, "druby://#{@remote[:host]}:#{@remote[:port]}" 
  else
    object = new
  end
   
  @input_arguments.each do |arg|
#     throw OpmException::NoInput.new(object) unless args.has_key? arg
    if args[arg].class == TemporaryProcessObject
      object.underProcessing = [arg, args[arg]]
      next
    end
    object.instance_variable_set("@#{arg}", args[arg])
   end
   
   @output_arguments.each do |arg|
     object.instance_variable_set("@#{arg}", args[arg])
     object.outputsForWait = arg
    class_eval %( class << object; def #{arg}
           return @#{arg} unless running?
            return TemporaryProcessObject.new(self, "#{arg}")
        end 
       end
      )
   end
  object.start_process
  return object
end

.remote(args) ⇒ Object

:nodoc:



191
192
193
# File 'lib/opm.rb', line 191

def self.remote(args) #:nodoc:
  @remote = args
end

Instance Method Details

#afterObject

Similar to before, but this is checking the result of the process



299
300
301
# File 'lib/opm.rb', line 299

def after
  true
end

#beforeObject

Before filter for process. It’s a logical function. If the function returns fault, the process calls the onBeforeError, where we can set the parameters. For more information about the cause of the error, use @error variable in onBeforeError.



293
294
295
# File 'lib/opm.rb', line 293

def before
  true
end

#onAfterErrorObject

Look: onBeforeError.



310
311
# File 'lib/opm.rb', line 310

def onAfterError
end

#onBeforeErrorObject

Callback function. It will signaled if before is false



305
306
# File 'lib/opm.rb', line 305

def onBeforeError
end

#onlyOne(args, &block) ⇒ Object

Secure only one process be running in a block.



315
316
317
318
319
320
321
# File 'lib/opm.rb', line 315

def onlyOne(args, &block)
    @@Mutex ||= Hash.new
    @@Mutex[args] = Mutex.new if @@Mutex[args] == nil
    @@Mutex[args].synchronize do
      yield
    end
end

#outputsForWait=(val) ⇒ Object

For set the output attributes, for wait_output_done



329
330
331
# File 'lib/opm.rb', line 329

def outputsForWait=( val ) #:nodoc:
  @outputsForWait << val
end

#running?Boolean

Check thread is running

Returns:

  • (Boolean)


368
369
370
# File 'lib/opm.rb', line 368

def running?
  return @thread.alive?
end

#start_processObject

:nodoc:



333
334
335
# File 'lib/opm.rb', line 333

def start_process #:nodoc:
  @thread.run
end

#underProcessing=(val) ⇒ Object

:nodoc:



323
324
325
# File 'lib/opm.rb', line 323

def underProcessing=( val ) #:nodoc:
  @underProcessing << val
end

#waitObject



372
373
374
# File 'lib/opm.rb', line 372

def wait
  @thread.join
end