Class: Nsync::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/nsync/consumer.rb

Overview

The Nsync::Consumer is used to handle the consumption of data from an Nsync repo for the entire app. It reads in the differences between the current version of data in the database and the new data from the producer, finding and notifying all affected classes and objects.

Basic Usage:

Nsync::Config.run do |c|
  # The consumer uses a read-only, bare repository (one ending in .git)
  # This will automatically be created if it does not exist
  c.repo_path = "/local/path/to/hold/data.git"
  # The remote repository url from which to pull data
  c.repo_url = "git@examplegithost:username/data.git"

  # An object that implements the VersionManager interface 
  # (see Nsync::GitVersionManager) for an example
  c.version_manager = MyCustomVersionManager.new

  # A lock file path to use for this app
  c.lock_file = "/tmp/app_name_nsync.lock"

  # The class mapping maps from the class names of the producer classes to
  # the class names of their associated consuming classes. A producer can
  # map to one or many consumers, and a consumer can be mapped to one or many
  # producers. Consumer classes should implement the Consumer interface.
  c.map_class "RawDataPostClass", "Post"
  c.map_class "RawDataInfo", "Info"
end

# Create a new consumer object, this will clone the repo if needed
consumer = Nsync::Consumer.new

# update this app to the latest data, pulling if necessary
consumer.update

# rollback the last change
consumer.rollback

Direct Known Subclasses

Producer

Defined Under Namespace

Classes: Change, CouldNotInitializeRepoError

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeConsumer

Sets the repository to the repo at config.repo_path

If config.repo_url is set and the directory at config.repo_path does not exist yet, a new bare repository will be cloned from config.repo_url



49
50
51
52
53
# File 'lib/nsync/consumer.rb', line 49

def initialize
  unless get_or_create_repo
    raise CouldNotInitializeRepoError
  end
end

Instance Attribute Details

#repoObject

Returns the value of attribute repo.



40
41
42
# File 'lib/nsync/consumer.rb', line 40

def repo
  @repo
end

Instance Method Details

#after_class_finished(klass, l) ⇒ Object

Adds a callback to the list of callbacks to occur after main processing of the class specified by ‘klass’. Can be used to handle data relations between objects of the same class.

Example:

class Post
  def nsync_update(consumer, event_type, filename, data)
    #... normal data update stuff ...
    post = self
    related_post_source_ids = data['related_post_ids']
    consumer.after_class_finished(Post, lambda {
      posts = Post.all(:conditions => 
        {:source_id => related_post_source_ids })
      post.related_posts = posts
    })
  end
end

Parameters:

  • klass (Class)
  • l (Proc)


205
206
207
208
209
# File 'lib/nsync/consumer.rb', line 205

def after_class_finished(klass, l)
  config.log.info("[NSYNC] Added callback to run after class '#{klass}'")
  @after_class_finished_queues[klass] ||= []
  @after_class_finished_queues[klass] << l
end

#after_current_class_finished(l) ⇒ Object

Adds a callback to the list of callbacks to occur after main processing of the class that is currently being processed. This is essentially an alias for after_class_finished for the current class

Parameters:

  • l (Proc)


216
217
218
# File 'lib/nsync/consumer.rb', line 216

def after_current_class_finished(l)
  after_class_finished(@current_class_for_queue, l)
end

#after_finished(l) ⇒ Object

Adds a callback to the list of callbacks to occur after all changes have been applied. This queue executes immediately prior to the current version being updated

Parameters:

  • l (Proc)


226
227
228
229
230
# File 'lib/nsync/consumer.rb', line 226

def after_finished(l)
  config.log.info("[NSYNC] Added callback to run at the end of the update")
  @after_finished_queue ||= []
  @after_finished_queue << l
end

#apply_changes(a, b) ⇒ Object

Translates and applies the changes between commit id ‘a’ and commit id ‘b’ to the datastore. This is used internally by rollback and update. Don’t use this unless you absolutely know what you are doing.

If you must call this directly, understand that ‘a’ should almost always be the commit id of the current data that is loaded into the database. ‘b’ can be any commit in the graph, forward or backwards.

Parameters:

  • a (String)

    current data version commit id

  • b (String)

    new data version commit id



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/nsync/consumer.rb', line 104

def apply_changes(a, b)
  return false if a == b
  config.lock do
    config.log.info("[NSYNC] Moving Nsync::Consumer from '#{a}' to '#{b}'")
    clear_queues
    diffs = nil
    diffs = repo.diff(a, b)

    changeset = changeset_from_diffs(diffs)

    if config.ordering
      config.ordering.each do |klass|
        klass = begin
            CoreExtensions.constantize(klass)
          rescue NameError => e
            config.log.warn("[NSYNC] Could not find class '#{klass}' from ordering; skipping")
            false
          end
        if klass
          changes = changeset[klass]
          if changes
            apply_changes_for_class(klass, changes)
          end
        end
      end
    else
      changeset.each do |klass, changes|
        apply_changes_for_class(klass, changes)
      end
    end
    run_after_finished
    clear_queues
    config.version_manager.version = b
  end
end

#changes(a, b) ⇒ Object



89
90
91
92
# File 'lib/nsync/consumer.rb', line 89

def changes(a, b)
  diffs = repo.diff(a,b)
  changeset_from_diffs(diffs)
end

#configNsync::Config

Returns:



79
80
81
# File 'lib/nsync/consumer.rb', line 79

def config 
  Nsync.config
end

#first_commitObject

Gets the first commit id in the repo



240
241
242
# File 'lib/nsync/consumer.rb', line 240

def first_commit
  self.repo.git.rev_list({:reverse => true}, "master").split("\n").first
end

#latest_changesObject



83
84
85
86
87
# File 'lib/nsync/consumer.rb', line 83

def latest_changes
  update_repo &&
  changes(config.version_manager.version,
          repo.head.commit.id)
end

#remotesObject

Lists the configured data remotes in the repo



233
234
235
236
237
# File 'lib/nsync/consumer.rb', line 233

def remotes
  repo.git.remote({:v => true}).split("\n").map do |line|
    line.split(/\s+/)
  end
end

#reprocess_class!(klass) ⇒ Object

Reprocesses all changes from the start of the repo to the current version for the class klass, queues will not be cleared, so you can use this to do powerful data reconstruction. You can also shoot your foot off. Be very careful



144
145
146
147
148
149
150
151
152
# File 'lib/nsync/consumer.rb', line 144

def reprocess_class!(klass)
  diffs = repo.diff(first_commit, config.version_manager.version)
  changeset = changeset_from_diffs(diffs)

  changes = changeset[klass]
  if changes
    apply_changes_for_class(klass, changes)
  end
end

#rollbackObject

Rolls back data to the previous loaded version

NOTE: If you rollback and then update, the ‘bad’ commit will then be reloaded. This is primarily meant as a way to get back to a known good state quickly, while the issues are fixed in the producer.



73
74
75
76
# File 'lib/nsync/consumer.rb', line 73

def rollback
  apply_changes(config.version_manager.version,
                config.version_manager.previous_version)
end

#updateObject

Updates the data to the latest version

If the repo has a remote origin, the latest changes will be fetched.

NOTE: It is critical that the version_manager returns correct results as this method goes from what it says is the latest commit that was loaded in to HEAD.



62
63
64
65
66
# File 'lib/nsync/consumer.rb', line 62

def update
  update_repo &&
  apply_changes(config.version_manager.version,
                repo.head.commit.id)
end