Class: Pupa::Processor
- Inherits:
-
Object
- Object
- Pupa::Processor
- Extended by:
- Forwardable
- Includes:
- Helper
- Defined in:
- lib/pupa/processor.rb,
lib/pupa/processor/client.rb,
lib/pupa/processor/helper.rb,
lib/pupa/processor/yielder.rb,
lib/pupa/processor/connection.rb,
lib/pupa/processor/document_store.rb,
lib/pupa/processor/dependency_graph.rb,
lib/pupa/processor/middleware/logger.rb,
lib/pupa/processor/middleware/parse_html.rb,
lib/pupa/processor/middleware/parse_json.rb,
lib/pupa/processor/document_store/file_store.rb,
lib/pupa/processor/document_store/redis_store.rb,
lib/pupa/processor/connection_adapters/mongodb_adapter.rb,
lib/pupa/processor/connection_adapters/postgresql_adapter.rb
Overview
An abstract processor class from which specific processors inherit.
Defined Under Namespace
Modules: Helper, Middleware Classes: Client, Connection, DependencyGraph, DocumentStore, Yielder
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#report ⇒ Object
readonly
Returns the value of attribute report.
-
#store ⇒ Object
readonly
Returns the value of attribute store.
Class Method Summary collapse
-
.add_scraping_task(task_name) ⇒ Object
Adds a scraping task to Pupa.rb.
Instance Method Summary collapse
-
#dispatch(object) ⇒ Object
Yields the object to the transformation task for processing, e.g.
-
#dump_scraped_objects(task_name) ⇒ Hash
Dumps scraped objects to disk.
-
#get(url, params = {}) ⇒ Object
Retrieves and parses a document with a GET request.
-
#import ⇒ Object
Saves scraped objects to a database.
-
#initialize(output_dir, pipelined: false, cache_dir: nil, expires_in: 86400, value_max_bytes: 1048576, memcached_username: nil, memcached_password: nil, database_url: 'mongodb://localhost:27017/pupa', validate: true, level: 'INFO', logdev: STDOUT, faraday_options: {}, options: {}) ⇒ Processor
constructor
A new instance of Processor.
-
#post(url, params = {}) ⇒ Object
Retrieves and parses a document with a POST request.
Methods included from Helper
Constructor Details
#initialize(output_dir, pipelined: false, cache_dir: nil, expires_in: 86400, value_max_bytes: 1048576, memcached_username: nil, memcached_password: nil, database_url: 'mongodb://localhost:27017/pupa', validate: true, level: 'INFO', logdev: STDOUT, faraday_options: {}, options: {}) ⇒ Processor
Returns a new instance of Processor.
36 37 38 39 40 41 42 43 44 |
# File 'lib/pupa/processor.rb', line 36 def initialize(output_dir, pipelined: false, cache_dir: nil, expires_in: 86400, value_max_bytes: 1048576, memcached_username: nil, memcached_password: nil, database_url: 'mongodb://localhost:27017/pupa', validate: true, level: 'INFO', logdev: STDOUT, faraday_options: {}, options: {}) @store = DocumentStore.new(output_dir, pipelined: pipelined) @client = Client.new(cache_dir: cache_dir, expires_in: expires_in, value_max_bytes: value_max_bytes, memcached_username: memcached_username, memcached_password: memcached_password, level: level, logdev: logdev, faraday_options: ) @connection = Connection.new(database_url) @logger = Logger.new('pupa', level: level, logdev: logdev) @validate = validate @options = @report = {} end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
17 18 19 |
# File 'lib/pupa/processor.rb', line 17 def client @client end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
17 18 19 |
# File 'lib/pupa/processor.rb', line 17 def connection @connection end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
17 18 19 |
# File 'lib/pupa/processor.rb', line 17 def @options end |
#report ⇒ Object (readonly)
Returns the value of attribute report.
17 18 19 |
# File 'lib/pupa/processor.rb', line 17 def report @report end |
#store ⇒ Object (readonly)
Returns the value of attribute store.
17 18 19 |
# File 'lib/pupa/processor.rb', line 17 def store @store end |
Class Method Details
.add_scraping_task(task_name) ⇒ Object
Adds a scraping task to Pupa.rb.
Defines a method whose name is identical to task_name
. This method
selects a method to perform the scraping task using scraping_task_method
and memoizes its return value. The return value is a lazy enumerator of
objects scraped by the selected method. The selected method must yield
objects to populate this lazy enumerator.
For example, MyProcessor.add_scraping_task(:people)
defines a people
method on MyProcessor
. This people
method returns a lazy enumerator of
objects (presumably Person objects in this case, but the enumerator can
contain any object in the general case).
In MyProcessor
, you would define an scrape_people
method, which must
yield objects to populate the lazy enumerator. Alternatively, you may
override scraping_task_method
to change the method selected to perform
the scraping task.
The people
method can then be called by transformation and import tasks.
95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/pupa/processor.rb', line 95 def self.add_scraping_task(task_name) self.tasks += [task_name] define_method(task_name) do ivar = "@#{task_name}" if instance_variable_defined?(ivar) instance_variable_get(ivar) else instance_variable_set(ivar, Yielder.new(&method(scraping_task_method(task_name)))) end end end |
Instance Method Details
#dispatch(object) ⇒ Object
All the good terms are taken by Ruby: return
, send
and yield
.
Yields the object to the transformation task for processing, e.g. saving to disk, printing to CSV, etc.
69 70 71 |
# File 'lib/pupa/processor.rb', line 69 def dispatch(object) Fiber.yield(object) end |
#dump_scraped_objects(task_name) ⇒ Hash
Dumps scraped objects to disk.
112 113 114 115 116 117 118 119 120 121 |
# File 'lib/pupa/processor.rb', line 112 def dump_scraped_objects(task_name) counts = Hash.new(0) @store.pipelined do send(task_name).each do |object| counts[object._type] += 1 dump_scraped_object(object) end end counts end |
#get(url, params = {}) ⇒ Object
Retrieves and parses a document with a GET request.
51 52 53 |
# File 'lib/pupa/processor.rb', line 51 def get(url, params = {}) client.get(url, params).body end |
#import ⇒ Object
Saves scraped objects to a database.
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/pupa/processor.rb', line 130 def import @report[:import] = {} objects = deduplicate(load_scraped_objects) object_id_to_database_id = {} if use_dependency_graph?(objects) dependency_graph = build_dependency_graph(objects) # Replace object IDs with database IDs in foreign keys and save objects. dependency_graph.tsort.each do |id| object = objects[id] resolve_foreign_keys(object, object_id_to_database_id) # The dependency graph strategy only works if there are no foreign objects. database_id = import_object(object) object_id_to_database_id[id] = database_id object_id_to_database_id[database_id] = database_id end else size = objects.size # Should be O(n²). If there are foreign objects, we do not know all the # edges in the graph, and therefore cannot build a dependency graph or # derive any evaluation order. # # An exception is raised if a foreign object matches multiple documents # in the database. However, if a matching object is not yet saved, this # exception may not be raised. loop do progress_made = false objects.delete_if do |id,object| begin resolve_foreign_keys(object, object_id_to_database_id) resolve_foreign_objects(object, object_id_to_database_id) progress_made = true database_id = import_object(object) object_id_to_database_id[id] = database_id object_id_to_database_id[database_id] = database_id rescue Pupa::Errors::MissingDatabaseIdError false end end break if objects.empty? || !progress_made end unless objects.empty? raise Errors::UnprocessableEntity, "couldn't resolve #{objects.size}/#{size} objects:\n #{objects.values.map{|object| JSON.dump(object.foreign_properties)}.join("\n ")}" end end # Ensure that fingerprints uniquely identified objects. counts = {} object_id_to_database_id.each do |object_id,database_id| unless object_id == database_id (counts[database_id] ||= []) << object_id end end duplicates = counts.select do |_,object_ids| object_ids.size > 1 end unless duplicates.empty? raise Errors::DuplicateDocumentError, "multiple objects written to same document:\n" + duplicates.map{|database_id,object_ids| " #{database_id} <- #{object_ids.join(' ')}"}.join("\n") end end |
#post(url, params = {}) ⇒ Object
Retrieves and parses a document with a POST request.
60 61 62 |
# File 'lib/pupa/processor.rb', line 60 def post(url, params = {}) client.post(url, params).body end |