Module: FileProcessingJob

Defined in:
lib/fpj/client.rb,
lib/fpj/server.rb,
lib/fpj/connection.rb,
lib/file-processing-job.rb

Overview

add special parsers - parse JSON files, XML files into nokogiri, etc go to eventmachine forum - see if anyone has already done this

Defined Under Namespace

Modules: Client, Server Classes: Connection

Class Method Summary collapse

Class Method Details

.connect(host = '127.0.0.1', port = 11222, callback) ⇒ Object



5
6
7
8
9
10
11
12
# File 'lib/fpj/client.rb', line 5

def self.connect(host='127.0.0.1',port=11222,callback)
  @@running = true
  while (@@running)
    client_connection = Client::Connection
    client_connection.callback = callback
    EM::run { EventMachine::connect host, port, client_connection }
  end
end

.disconnectObject



14
15
16
17
# File 'lib/fpj/client.rb', line 14

def self.disconnect
  @@running = false
  EventMachine::stop_event_loop
end

.loggerObject



7
8
9
10
11
12
13
14
15
16
17
# File 'lib/file-processing-job.rb', line 7

def self.logger
  @@logger ||= begin
    if (const_defined?('Rails'))
      Rails.logger
    else
      require 'logger'
      Dir.mkdir('log') unless Dir.exists?('log')
      Logger.new('log/fileprocessingjob.log')
    end
  end
end

.server_configObject

FileProcessingJob::start_server creates an event machine server on the specified IP address and port. The optional configuration object passed to start_server can be used to point FileProcessingJob to the directories used to manage the files.

By default, FileProcessingJob will look in the following subdirectories to manage files. If the directories do not exist they will be created automatically at runtime.

Files placed in the “inbox” directory will automatically detected and sent to a worker for processing.

./data/inbox ./data/processing ./data/processed

Logging

By default the server and clients will use the Rails logging facility if deployed within a Rails application. Otherwise a logger will be created that logs to ./log/file_processing_job.log

Usage example

The server monitors the inbox directory for new files and dispatches
the contents of each file to the next available worker. The worker
then processed the file or raises an exception and the server handles
moving the file to either the processed or error directory. Any number
of workers can exist anywhere on the network. Note that by default
the server and client bind to 127.0.0.1

require 'rubygems'
require 'file-processing-job'

Server:

FileProcessingJob::start_server() {|config|
  config.inbox_directory = './data/inbox'
  config.processing_directory = './data/processing'
  config.processed_directory = './data/processed'
}

Client:

class FileProcessor
  def receive_file data
    puts "received file data: #{data}"
    # do something interesting here
  end
end

FileProcessingJob::connect('127.0.0.1', 11222, FileProcessor)


61
62
# File 'lib/fpj/server.rb', line 61

def self.server_config
end

.start_server(host = '127.0.0.1', port = 11222, &block) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/fpj/server.rb', line 68

def self.start_server(host='127.0.0.1',port=11222,&block)
  Server.config(&block) if block

  # create the directories if they do not exist
  Server.create_directories

  # manually load existing files in case there is a large backlog
  # the directory_watcher interface does not handle large volumes
  # well on startup
  Dir.entries(Server::config.inbox_directory).each do |filename|
    unless (File.directory?(File.join(Server::config.inbox_directory, filename)))
      Server::Connection.push(filename) unless filename =~ /^\./
    end
  end

  # watch the inbox directory for additions
  dw = DirectoryWatcher.new Server::config.inbox_directory, :glob => '**.*', :pre_load => true
  dw.add_observer {|*args| 
    args.each {|event| 
      if (event.type == :added)
        filename = event.path.sub(Server::config.inbox_directory, '').sub(/^\//, '')
        Server::Connection.push(filename) 
      end
    }
  }
  dw.start
  
  # run the server
  EM::run {

    # start the server
    EM::start_server host, port, Server::Connection
    FileProcessingJob.logger.info "server started: #{host}:#{port}"
  }
  dw.stop
end

.stop_serverObject



64
65
66
# File 'lib/fpj/server.rb', line 64

def self.stop_server
  
end