Class: RubySync::Pipelines::BasePipeline
- Includes:
- Utilities
- Defined in:
- lib/ruby_sync/pipelines/base_pipeline.rb
Overview
This pipeline is the base for those that synchronize content bi-directionally between two datastores. These are commonly used for synchronizing identity information between directories.
One of the data-stores is called the identity-vault. This is generally the central repository for identity information (typically, an LDAP server or relational database). We’ll call the other data-store the client for want of a better term. This is could be anything that an EndPoint has been written for (an LDAP server, a text file, an application, etc).
We refer to the flow of events from the client to the identity-vault as incoming and those from the identity vault to the client as out-going. Methods in this class prefixed with ‘in_’ or ‘out_’ work on the incoming or outgoing flows respectively.
Class Method Summary collapse
-
.allow_in(*fields) ⇒ Object
Specify which fields will be allowed through the incoming filter If nil (the default), all fields are allowed.
-
.allow_out(*fields) ⇒ Object
Specify which fields will be allowed through the incoming filter If nil (the default), all fields are allowed.
- .client(connector_name, options = {}) ⇒ Object
- .in_transform(&blk) ⇒ Object
- .map_client_to_vault(mappings) ⇒ Object
- .map_vault_to_client(mappings) ⇒ Object
- .out_transform(&blk) ⇒ Object
- .vault(connector_name, options = {}) ⇒ Object
Instance Method Summary collapse
-
#allowed_in ⇒ Object
default allowed_in in case allow_in doesn’t get called.
-
#allowed_out ⇒ Object
default allowed_out in case allow_out doesn’t get called.
-
#association_context ⇒ Object
The context for all association keys used by this pipeline.
-
#in_create(event) ⇒ Object
Override to restrict creation on the vault.
-
#in_filter(event) ⇒ Object
Default method for allowed_in.
-
#in_handler(event) ⇒ Object
Called by the ‘in’ connector in the ‘in’ thread to process events generated by the client.
-
#in_map_schema(event) ⇒ Object
If client_to_vault_map is defined (usually by map_client_to_vault) then fix up the contents of the payload to refer to the fields by the names in the vault namespace.
- #in_match(event) ⇒ Object
-
#in_place(event) ⇒ Object
Override to modify the target path for creation in the vault.
-
#initialize ⇒ BasePipeline
constructor
A new instance of BasePipeline.
- #map_schema(event, map) ⇒ Object
- #name ⇒ Object
-
#out_create(event) ⇒ Object
Override to restrict creation on the client.
-
#out_event_filter(event) ⇒ Object
Override to process the event generated by the publisher before any other processing is done.
-
#out_filter(event) ⇒ Object
Default method for allowed_out.
-
#out_handler(event) ⇒ Object
Called by the identity-vault connector in the ‘out’ thread to process events generated by the identity vault.
- #out_map_schema(event) ⇒ Object
-
#out_match(event) ⇒ Object
Override to implement some kind of matching.
-
#out_place(event) ⇒ Object
Override to modify the target path for creation on the client.
- #perform_transform(name, event, hint = "") ⇒ Object
-
#run_in_once ⇒ Object
Execute the in pipe once and then return.
-
#run_once ⇒ Object
Execute the pipeline once then return.
-
#run_out_once ⇒ Object
Execute the out pipe once and then return.
Methods included from Utilities
#base_path, #call_if_exists, #connector_called, #ensure_dir_exists, #find_base_path, #get_preference, #get_preference_file_path, #include_in_search_path, #log_progress, #pipeline_called, #set_preference, #something_called, #with_rescue
Constructor Details
#initialize ⇒ BasePipeline
Returns a new instance of BasePipeline.
43 44 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 43 def initialize end |
Class Method Details
.allow_in(*fields) ⇒ Object
Specify which fields will be allowed through the incoming filter If nil (the default), all fields are allowed.
322 323 324 325 326 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 322 def self.allow_in *fields class_def 'allowed_in' do fields.map {|f| f.to_s} end end |
.allow_out(*fields) ⇒ Object
Specify which fields will be allowed through the incoming filter If nil (the default), all fields are allowed.
344 345 346 347 348 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 344 def self.allow_out *fields class_def 'allowed_out' do fields.map {|f| f.to_s } end end |
.client(connector_name, options = {}) ⇒ Object
50 51 52 53 54 55 56 57 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 50 def self.client(connector_name, ={}) class_name = RubySync::Connectors::BaseConnector.class_name_for(connector_name) [:name] ||= "#{self.name}(client)" [:is_vault] = false class_def 'client' do @client ||= eval("::#{class_name}").new() end end |
.in_transform(&blk) ⇒ Object
107 108 109 110 111 112 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 107 def self.in_transform &blk define_method :in_transform do |event| event. :transform, &blk event.transform end end |
.map_client_to_vault(mappings) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 68 def self.map_client_to_vault mappings remove_method :client_to_vault_map if method_defined? :client_to_vault_map class_def 'client_to_vault_map' do unless @client_to_vault_map @client_to_vault_map = {} mappings.each {|k,v| @client_to_vault_map[k.to_s] = v.to_s} end @client_to_vault_map end unless method_defined? :vault_to_client_map class_def 'vault_to_client_map' do @vault_to_client_map ||= client_to_vault_map.invert end end end |
.map_vault_to_client(mappings) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 84 def self.map_vault_to_client mappings remove_method :vault_to_client_map if method_defined? :vault_to_client_map class_def 'vault_to_client_map' do unless @vault_to_client_map @vault_to_client_map = {} mappings.each {|k,v| @vault_to_client_map[k.to_s] = v.to_s} end @vault_to_client_map end unless method_defined? :client_to_vault_map class_def 'client_to_vault_map' do @client_to_vault_map ||= vault_to_client_map.invert end end end |
.out_transform(&blk) ⇒ Object
100 101 102 103 104 105 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 100 def self.out_transform &blk define_method :out_transform do |event| event. :transform, &blk event.transform end end |
.vault(connector_name, options = {}) ⇒ Object
59 60 61 62 63 64 65 66 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 59 def self.vault(connector_name, ={}) class_name = RubySync::Connectors::BaseConnector.class_name_for(connector_name) [:name] ||= "#{self.name}(vault)" [:is_vault] = true class_def 'vault' do @vault ||= eval("::" + class_name).new() end end |
Instance Method Details
#allowed_in ⇒ Object
default allowed_in in case allow_in doesn’t get called
329 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 329 def allowed_in; nil; end |
#allowed_out ⇒ Object
default allowed_out in case allow_out doesn’t get called
351 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 351 def allowed_out; nil; end |
#association_context ⇒ Object
The context for all association keys used by this pipeline. By default, defer to the client
284 285 286 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 284 def association_context @client.association_context end |
#in_create(event) ⇒ Object
Override to restrict creation on the vault
191 192 193 194 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 191 def in_create event log.debug "Create allowed through default rule" true end |
#in_filter(event) ⇒ Object
Default method for allowed_in. Override by calling allow_in def allowed_in; false; end
333 334 335 336 337 338 339 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 333 def in_filter(event) if allowed_in event.drop_all_but_changes_to allowed_in else event end end |
#in_handler(event) ⇒ Object
Called by the ‘in’ connector in the ‘in’ thread to process events generated by the client.
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 246 def in_handler(event) event.retrieve_association(association_context) hint = " (#{client.name} => #{vault.name})" log.info "Processing incoming #{event.type} event"+hint log.info YAML.dump(event) perform_transform :in_map_schema, event, hint perform_transform :in_transform, event, hint perform_transform :in_filter, event, hint # The client can't really know whether its an add or a modify because it doesn't store # the association. if event.type == :modify event.convert_to_add unless event.associated? and vault.find_associated(event.association) elsif event.type == :add and event.associated? and vault.find_associated(event.association) event.convert_to_modify end if event.type == :add match = in_match(event) # exactly one event record in the vault matched if match event.merge(match) return end if in_create(event) perform_transform :in_place, event, hint else return end end with_rescue("#{vault.name}: Processing command") {vault.process(event)} end |
#in_map_schema(event) ⇒ Object
If client_to_vault_map is defined (usually by map_client_to_vault) then fix up the contents of the payload to refer to the fields by the names in the vault namespace
296 297 298 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 296 def in_map_schema event map_schema event, client_to_vault_map if respond_to? :client_to_vault_map end |
#in_match(event) ⇒ Object
288 289 290 291 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 288 def in_match event log.debug "Default match rule - source path exists in vault" vault.respond_to?(:'[]') and vault[event.source_path] end |
#in_place(event) ⇒ Object
Override to modify the target path for creation in the vault
203 204 205 206 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 203 def in_place(event) log.debug "Default placement rule target_path = source_path" event.target_path = event.source_path end |
#map_schema(event, map) ⇒ Object
304 305 306 307 308 309 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 304 def map_schema event, map return unless map and event.payload event.payload.each do |op| op.subject = map[op.subject] || op.subject if op.subject end end |
#name ⇒ Object
46 47 48 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 46 def name self.class.name end |
#out_create(event) ⇒ Object
Override to restrict creation on the client
185 186 187 188 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 185 def out_create event log.debug "Create allowed through default rule" true end |
#out_event_filter(event) ⇒ Object
Override to process the event generated by the publisher before any other processing is done. Return false to veto the event.
243 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 243 def out_event_filter(event); true; end |
#out_filter(event) ⇒ Object
Default method for allowed_out. Override by calling allow_in def allowed_out; false; end
355 356 357 358 359 360 361 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 355 def out_filter(event) if allowed_out event.drop_all_but_changes_to allowed_out else event end end |
#out_handler(event) ⇒ Object
Called by the identity-vault connector in the ‘out’ thread to process events generated by the identity vault.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 117 def out_handler(event) event.retrieve_association(association_context) event.convert_to_modify if event.associated? and event.type == :add hint = " (#{vault.name} => #{client.name})" log.info "Processing out-going #{event.type} event #{hint}" log.info YAML.dump(event) return unless out_event_filter event # Remove unwanted attributes perform_transform :out_filter, event unless event.associated? if [:delete, :remove_association].include? event.type log.info "#{name}: No action for #{event.type} of unassociated entry" log.info YAML.dump(event) return end end if event.type == :modify unless event.associated? and client.has_entry_for_key?(event.association.key) event.convert_to_add end end if event.type == :add match = out_match(event) log.info "Attempting to match" if match # exactly one event record on the client matched log.info "Match found, merging" event.merge(match) association = Association.new(self.association_context, match.src_path) vault.associate asssociation, event.source_path return end log.info "No match found, creating" return unless out_create(event) perform_transform :out_place, event end perform_transform :out_map_schema, event perform_transform :out_transform, event association_key = nil with_rescue("#{client.name}: Processing command") do association_key = client.process(event) end if association_key association = Association.new(association_context, association_key) with_rescue("#{client.name}: Storing association #{association} in vault") do vault.associate(association, event.source_path) end end end |
#out_map_schema(event) ⇒ Object
300 301 302 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 300 def out_map_schema event map_schema event, vault_to_client_map if respond_to? :vault_to_client_map end |
#out_match(event) ⇒ Object
Override to implement some kind of matching
178 179 180 181 182 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 178 def out_match event log.debug "Default matching rule - source path exists on client?" client.respond_to?(:'[]') and client[event.source_path] false end |
#out_place(event) ⇒ Object
Override to modify the target path for creation on the client
197 198 199 200 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 197 def out_place(event) log.debug "Default placement rule target_path = source_path" event.target_path = event.source_path end |
#perform_transform(name, event, hint = "") ⇒ Object
208 209 210 211 212 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 208 def perform_transform name, event, hint="" call_if_exists name, event, hint event.commit_changes log_progress name, event, hint end |
#run_in_once ⇒ Object
Execute the in pipe once and then return
227 228 229 230 231 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 227 def run_in_once log.info "Running #{name} 'in' pipeline once" client.once_only = true client.start {|event| in_handler(event)} end |
#run_once ⇒ Object
Execute the pipeline once then return. TODO Consider making this run in and out simultaneously
220 221 222 223 224 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 220 def run_once log.info "Running #{name} pipeline once" run_in_once run_out_once end |
#run_out_once ⇒ Object
Execute the out pipe once and then return
234 235 236 237 238 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 234 def run_out_once log.info "Running #{name} 'out' pipeline once" vault.once_only = true vault.start {|event| out_handler(event)} end |