Processor

Build Status Gemnasium Build Status

Processor could execute any DataProcessor you specify and log entire process using any number of loggers you need.

You may add own observers for monitoring background tasks on even send an email to bussiness with generated report.

Processor provide customisation for almost every part of it.

Contents

  1. Installation
  2. Requirements
  3. Usage
    1. Data processors
    2. Run modes
    3. Processor Thread
    4. Observers
  4. Compatibility
  5. Contributing
  6. Copyright

Installation

Add this line to your application's Gemfile:

gem 'processor'

And then execute:

bundle

Or install it yourself as:

gem install processor

Requirements

  1. Ruby 1.9
  2. Rspec2 for testing

Usage

Data processors

Actual processing is done by a Data Processor, provided by end user.
This processor should implement in general 2 methods:

  1. process(record)
  2. records

But it is recomended to implement a name method also, because it is required by several observers. Inherit your Data Processor from NullProcessor to get default behavior out of the box.

See Processor::Example::Migration for example (example/migration.rb).

There are several predefined data processors you can reuse:

ArrayProcessor

The simplest one: process and records methods should be implemented.

BatchProcessor

Allows to fetch records by batches of defined size.

It is based on query method that suppose to run a query method on database.

Recomended to override fetch_batch method to get real reason to use batch processing. fetch_batch could be query.first(10) or query.page(next_page). See data/solr_pages_processor.rb and data/solr_processor.rb for example.

Other

see data/csv_processor.rb for running migration from CSV files.

Run modes

Currently 2 run modes are supported:

Successive

It runs process one by one for each found record returned by records method.

Recomended to call it using a Processor::Thread:

Processor::Thread.new(migration).run_successive

Threads

It runs process for each found record returned by records method not waiting for previous process to finish.

Possible to specify number of threads used by passing a number to constructor:

Processor::ProcessRunner::Threads.new 5

Recomended to call it using a Processor::Thread :

Processor::Thread.new(migration).run_in_threads 5

Observers

Processor support unlimited number of observers, watching processing.

Thay could monitor running migrations and output to logs, console or file usefull information. Or thay can show a progress bar to your console. Or pack a generated report to archive and send by email to bussiness on success or notify developers on failure.

This observers should respond to update method. But if you inherit from Processor::Observers::NullObserver you'll get a bunch of methods, such as before_ and after_ processing, error handling methods to use. See Processor::Observers::Logger for example.

Read below section Processor Thread to see how to use observers in runner.

Processor Thread

Processor::Thread is a Facade pattern. It simplifies access to all Processor classes and provide stable interface.

Creating a new Thread:

Processor::Thread.new data_processor

You may provide optional observers:

Processor::Thread.new data_processor, observer1, observer2, ...

Instance have a run_as method that accepts a block:

thread = Processor::Thread.new @migration
thread.run_as do |processor, *|
  processor.records.each do |record|
    processor.process record
  end
end

Block could accept next arguments: processor, events, recursion_preventer method. Last one could be called to prevent recurtion:

recursion_preventer.call

Instance have a run_successive method:

data_processor = UserLocationMigration.new
thread = Processor::Thread.new data_processor
thread.run_successive

And run_in_threads method:

data_processor = UserCsvImport.new csv_file
thread = Processor::Thread.new data_processor
thread.run_in_threads 10

See spec/processor/thread_spec.rb and spec/example_spec.rb and example directory for other usage examples.

It is recomended to wrap Processor::Thread by classes named like:

WeeklyReport
TaxonomyMigration
UserDataImport

The point is to hide configuration of observers and use (if you wish) your own API to run reports or migrations:

weekly_report.create_and_deliver
user_data_import.import_from_csv(file)
etc.

It is possible to use it raw, but please dont fear to add a wrapper class like CsvUserImport for this:

csv_data_processor = Processor::Data::CsvProcessor.new file
stdout_notifier = Processor::Observer::Logger.new(Logger.new(STDOUT))
logger_observer = Processor::Observer::Logger.new
Processor::Thread.new(
  csv_data_processor,
  stdout_notifier,
  logger_observer,
  email_notification_observer
).run_in_threads 5

More documentation could be found by running

rspec

Compatibility

tested with Ruby

  • 1.9.3
  • rbx-19mode
  • ruby-head

see build history

Contributing

  1. Fork repository AlexParamonov/processor
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request

Copyright © 2013 Alexander Paramonov. Released under the MIT License. See the LICENSE file for further details.