Class: Pupa::Processor

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Helper
Defined in:
lib/pupa/processor.rb,
lib/pupa/processor/client.rb,
lib/pupa/processor/helper.rb,
lib/pupa/processor/yielder.rb,
lib/pupa/processor/connection.rb,
lib/pupa/processor/document_store.rb,
lib/pupa/processor/dependency_graph.rb,
lib/pupa/processor/middleware/logger.rb,
lib/pupa/processor/middleware/parse_html.rb,
lib/pupa/processor/middleware/parse_json.rb,
lib/pupa/processor/document_store/file_store.rb,
lib/pupa/processor/document_store/redis_store.rb,
lib/pupa/processor/connection_adapters/mongodb_adapter.rb,
lib/pupa/processor/connection_adapters/postgresql_adapter.rb

Overview

An abstract processor class from which specific processors inherit.

Defined Under Namespace

Modules: Helper, Middleware Classes: Client, Connection, DependencyGraph, DocumentStore, Yielder

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Helper

#clean

Constructor Details

#initialize(output_dir, pipelined: false, cache_dir: nil, expires_in: 86400, value_max_bytes: 1048576, memcached_username: nil, memcached_password: nil, database_url: 'mongodb://localhost:27017/pupa', validate: true, level: 'INFO', logdev: STDOUT, faraday_options: {}, options: {}) ⇒ Processor

Returns a new instance of Processor.

Parameters:

  • output_dir (String)

    the directory or Redis address (e.g. redis://localhost:6379) in which to dump JSON documents

  • pipelined (Boolean) (defaults to: false)

    whether to dump JSON documents all at once

  • cache_dir (String) (defaults to: nil)

    the directory or Memcached address (e.g. memcached://localhost:11211) in which to cache HTTP responses

  • expires_in (Integer) (defaults to: 86400)

    the cache's expiration time in seconds

  • value_max_bytes (Integer) (defaults to: 1048576)

    the maximum Memcached item size

  • memcached_username (String) (defaults to: nil)

    the Memcached username

  • memcached_password (String) (defaults to: nil)

    the Memcached password

  • database_url (String) (defaults to: 'mongodb://localhost:27017/pupa')

    the database URL

  • validate (Boolean) (defaults to: true)

    whether to validate JSON documents

  • level (String) (defaults to: 'INFO')

    the log level

  • logdev (String, IO) (defaults to: STDOUT)

    the log device

  • faraday_options (Hash) (defaults to: {})

    Faraday initialization options

  • options (Hash) (defaults to: {})

    criteria for selecting the methods to run



36
37
38
39
40
41
42
43
44
# File 'lib/pupa/processor.rb', line 36

def initialize(output_dir, pipelined: false, cache_dir: nil, expires_in: 86400, value_max_bytes: 1048576, memcached_username: nil, memcached_password: nil, database_url: 'mongodb://localhost:27017/pupa', validate: true, level: 'INFO', logdev: STDOUT, faraday_options: {}, options: {})
  @store      = DocumentStore.new(output_dir, pipelined: pipelined)
  @client     = Client.new(cache_dir: cache_dir, expires_in: expires_in, value_max_bytes: value_max_bytes, memcached_username: memcached_username, memcached_password: memcached_password, level: level, logdev: logdev, faraday_options: faraday_options)
  @connection = Connection.new(database_url)
  @logger     = Logger.new('pupa', level: level, logdev: logdev)
  @validate   = validate
  @options    = options
  @report     = {}
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



17
18
19
# File 'lib/pupa/processor.rb', line 17

def client
  @client
end

#connectionObject (readonly)

Returns the value of attribute connection.



17
18
19
# File 'lib/pupa/processor.rb', line 17

def connection
  @connection
end

#optionsObject (readonly)

Returns the value of attribute options.



17
18
19
# File 'lib/pupa/processor.rb', line 17

def options
  @options
end

#reportObject (readonly)

Returns the value of attribute report.



17
18
19
# File 'lib/pupa/processor.rb', line 17

def report
  @report
end

#storeObject (readonly)

Returns the value of attribute store.



17
18
19
# File 'lib/pupa/processor.rb', line 17

def store
  @store
end

Class Method Details

.add_scraping_task(task_name) ⇒ Object

Adds a scraping task to Pupa.rb.

Defines a method whose name is identical to task_name. This method selects a method to perform the scraping task using scraping_task_method and memoizes its return value. The return value is a lazy enumerator of objects scraped by the selected method. The selected method must yield objects to populate this lazy enumerator.

For example, MyProcessor.add_scraping_task(:people) defines a people method on MyProcessor. This people method returns a lazy enumerator of objects (presumably Person objects in this case, but the enumerator can contain any object in the general case).

In MyProcessor, you would define an scrape_people method, which must yield objects to populate the lazy enumerator. Alternatively, you may override scraping_task_method to change the method selected to perform the scraping task.

The people method can then be called by transformation and import tasks.

Parameters:

  • task_name (Symbol)

    a task name

See Also:

  • #scraping_task_method


95
96
97
98
99
100
101
102
103
104
105
# File 'lib/pupa/processor.rb', line 95

def self.add_scraping_task(task_name)
  self.tasks += [task_name]
  define_method(task_name) do
    ivar = "@#{task_name}"
    if instance_variable_defined?(ivar)
      instance_variable_get(ivar)
    else
      instance_variable_set(ivar, Yielder.new(&method(scraping_task_method(task_name))))
    end
  end
end

Instance Method Details

#dispatch(object) ⇒ Object

Note:

All the good terms are taken by Ruby: return, send and yield.

Yields the object to the transformation task for processing, e.g. saving to disk, printing to CSV, etc.

Parameters:

  • an (Object)

    object



69
70
71
# File 'lib/pupa/processor.rb', line 69

def dispatch(object)
  Fiber.yield(object)
end

#dump_scraped_objects(task_name) ⇒ Hash

Dumps scraped objects to disk.

Parameters:

  • task_name (Symbol)

    the name of the scraping task to perform

Returns:

  • (Hash)

    the number of scraped objects by type



112
113
114
115
116
117
118
119
120
121
# File 'lib/pupa/processor.rb', line 112

def dump_scraped_objects(task_name)
  counts = Hash.new(0)
  @store.pipelined do
    send(task_name).each do |object|
      counts[object._type] += 1
      dump_scraped_object(object)
    end
  end
  counts
end

#get(url, params = {}) ⇒ Object

Retrieves and parses a document with a GET request.

Parameters:

  • url (String)

    a URL to an HTML document

  • params (String, Hash) (defaults to: {})

    query string parameters

Returns:

  • a parsed document



51
52
53
# File 'lib/pupa/processor.rb', line 51

def get(url, params = {})
  client.get(url, params).body
end

#importObject

Saves scraped objects to a database.



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/pupa/processor.rb', line 130

def import
  @report[:import] = {}

  objects = deduplicate(load_scraped_objects)

  object_id_to_database_id = {}

  if use_dependency_graph?(objects)
    dependency_graph = build_dependency_graph(objects)

    # Replace object IDs with database IDs in foreign keys and save objects.
    dependency_graph.tsort.each do |id|
      object = objects[id]
      resolve_foreign_keys(object, object_id_to_database_id)
      # The dependency graph strategy only works if there are no foreign objects.

      database_id = import_object(object)
      object_id_to_database_id[id] = database_id
      object_id_to_database_id[database_id] = database_id
    end
  else
    size = objects.size

    # Should be O(n²). If there are foreign objects, we do not know all the
    # edges in the graph, and therefore cannot build a dependency graph or
    # derive any evaluation order.
    #
    # An exception is raised if a foreign object matches multiple documents
    # in the database. However, if a matching object is not yet saved, this
    # exception may not be raised.
    loop do
      progress_made = false

      objects.delete_if do |id,object|
        begin
          resolve_foreign_keys(object, object_id_to_database_id)
          resolve_foreign_objects(object, object_id_to_database_id)
          progress_made = true

          database_id = import_object(object)
          object_id_to_database_id[id] = database_id
          object_id_to_database_id[database_id] = database_id
        rescue Pupa::Errors::MissingDatabaseIdError
          false
        end
      end

      break if objects.empty? || !progress_made
    end

    unless objects.empty?
      raise Errors::UnprocessableEntity, "couldn't resolve #{objects.size}/#{size} objects:\n  #{objects.values.map{|object| JSON.dump(object.foreign_properties)}.join("\n  ")}"
    end
  end

  # Ensure that fingerprints uniquely identified objects.
  counts = {}
  object_id_to_database_id.each do |object_id,database_id|
    unless object_id == database_id
      (counts[database_id] ||= []) << object_id
    end
  end
  duplicates = counts.select do |_,object_ids|
    object_ids.size > 1
  end
  unless duplicates.empty?
    raise Errors::DuplicateDocumentError, "multiple objects written to same document:\n" + duplicates.map{|database_id,object_ids| "  #{database_id} <- #{object_ids.join(' ')}"}.join("\n")
  end
end

#post(url, params = {}) ⇒ Object

Retrieves and parses a document with a POST request.

Parameters:

  • url (String)

    a URL to an HTML document

  • params (String, Hash) (defaults to: {})

    query string parameters

Returns:

  • a parsed document



60
61
62
# File 'lib/pupa/processor.rb', line 60

def post(url, params = {})
  client.post(url, params).body
end