Class: RubySync::Pipelines::BasePipeline
- Includes:
- Utilities
- Defined in:
- lib/ruby_sync/pipelines/base_pipeline.rb
Overview
This pipeline is the base for those that synchronize content bi-directionally between two datastores. These are commonly used for synchronizing identity information between directories.
One of the data-stores is called the identity-vault. This is generally the central repository for identity information (typically, an LDAP server or relational database). We’ll call the other data-store the client for want of a better term. This is could be anything that an EndPoint has been written for (an LDAP server, a text file, an application, etc).
We refer to the flow of events from the client to the identity-vault as incoming and those from the identity vault to the client as out-going. Methods in this class prefixed with ‘in_’ or ‘out_’ work on the incoming or outgoing flows respectively.
Instance Attribute Summary collapse
-
#delay ⇒ Object
delay in seconds between checking connectors.
Class Method Summary collapse
-
.allow_in(*fields) ⇒ Object
Specify which fields will be allowed through the incoming filter If nil (the default), all fields are allowed.
-
.allow_out(*fields) ⇒ Object
Specify which fields will be allowed through the incoming filter If nil (the default), all fields are allowed.
- .client(connector_name, options = {}) ⇒ Object
- .in_transform(&blk) ⇒ Object
- .map_client_to_vault(mappings) ⇒ Object
- .map_vault_to_client(mappings) ⇒ Object
- .out_transform(&blk) ⇒ Object
- .vault(connector_name, options = {}) ⇒ Object
Instance Method Summary collapse
-
#allowed_in ⇒ Object
default allowed_in in case allow_in doesn’t get called.
-
#allowed_out ⇒ Object
default allowed_out in case allow_out doesn’t get called.
-
#association_context ⇒ Object
The context for all association keys used by this pipeline.
-
#in_create(event) ⇒ Object
Override to restrict creation on the vault.
-
#in_filter(event) ⇒ Object
Default method for allowed_in.
-
#in_handler(event) ⇒ Object
Called by the ‘in’ connector in the ‘in’ thread to process events generated by the client.
-
#in_map_schema(event) ⇒ Object
If client_to_vault_map is defined (usually by map_client_to_vault) then fix up the contents of the payload to refer to the fields by the names in the vault namespace.
- #in_match(event) ⇒ Object
-
#in_place(event) ⇒ Object
Override to modify the target path for creation in the vault.
-
#initialize ⇒ BasePipeline
constructor
A new instance of BasePipeline.
- #map_schema(event, map) ⇒ Object
- #name ⇒ Object
-
#out_create(event) ⇒ Object
Override to restrict creation on the client.
-
#out_event_filter(event) ⇒ Object
Override to process the event generated by the publisher before any other processing is done.
-
#out_filter(event) ⇒ Object
Default method for allowed_out.
-
#out_handler(event) ⇒ Object
Called by the identity-vault connector in the ‘out’ thread to process events generated by the identity vault.
- #out_map_schema(event) ⇒ Object
-
#out_match(event) ⇒ Object
Override to implement some kind of matching.
-
#out_place(event) ⇒ Object
Override to modify the target path for creation on the client.
- #perform_transform(name, event, hint = "") ⇒ Object
-
#run_in_once ⇒ Object
Execute the in pipe once and then return.
-
#run_once ⇒ Object
Execute the pipeline once then return.
-
#run_out_once ⇒ Object
Execute the out pipe once and then return.
- #start ⇒ Object
- #started ⇒ Object
- #stop ⇒ Object
- #stopped ⇒ Object
Methods included from Utilities
#as_array, #base_path, #call_if_exists, #connector_called, #effective_operations, #ensure_dir_exists, #find_base_path, #get_preference, #get_preference_file_path, #include_in_search_path, #log_progress, #perform_operations, #pipeline_called, #set_preference, #something_called, #with_rescue
Constructor Details
#initialize ⇒ BasePipeline
Returns a new instance of BasePipeline.
45 46 47 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 45 def initialize @delay = 5 end |
Instance Attribute Details
#delay ⇒ Object
delay in seconds between checking connectors
43 44 45 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 43 def delay @delay end |
Class Method Details
.allow_in(*fields) ⇒ Object
Specify which fields will be allowed through the incoming filter If nil (the default), all fields are allowed.
360 361 362 363 364 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 360 def self.allow_in *fields class_def 'allowed_in' do fields.map {|f| f.to_s} end end |
.allow_out(*fields) ⇒ Object
Specify which fields will be allowed through the incoming filter If nil (the default), all fields are allowed.
382 383 384 385 386 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 382 def self.allow_out *fields class_def 'allowed_out' do fields.map {|f| f.to_s } end end |
.client(connector_name, options = {}) ⇒ Object
53 54 55 56 57 58 59 60 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 53 def self.client(connector_name, ={}) class_name = RubySync::Connectors::BaseConnector.class_name_for(connector_name) [:name] ||= "#{self.name}(client)" [:is_vault] = false class_def 'client' do @client ||= eval("::#{class_name}").new() end end |
.in_transform(&blk) ⇒ Object
114 115 116 117 118 119 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 114 def self.in_transform &blk define_method :in_transform do |event| event. :transform, &blk event.transform end end |
.map_client_to_vault(mappings) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 75 def self.map_client_to_vault mappings remove_method :client_to_vault_map if method_defined? :client_to_vault_map class_def 'client_to_vault_map' do unless @client_to_vault_map @client_to_vault_map = {} mappings.each {|k,v| @client_to_vault_map[k.to_s] = v.to_s} end @client_to_vault_map end unless method_defined? :vault_to_client_map class_def 'vault_to_client_map' do @vault_to_client_map ||= client_to_vault_map.invert end end end |
.map_vault_to_client(mappings) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 91 def self.map_vault_to_client mappings remove_method :vault_to_client_map if method_defined? :vault_to_client_map class_def 'vault_to_client_map' do unless @vault_to_client_map @vault_to_client_map = {} mappings.each {|k,v| @vault_to_client_map[k.to_s] = v.to_s} end @vault_to_client_map end unless method_defined? :client_to_vault_map class_def 'client_to_vault_map' do @client_to_vault_map ||= vault_to_client_map.invert end end end |
.out_transform(&blk) ⇒ Object
107 108 109 110 111 112 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 107 def self.out_transform &blk define_method :out_transform do |event| event. :transform, &blk event.transform end end |
.vault(connector_name, options = {}) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 62 def self.vault(connector_name, ={}) class_name = RubySync::Connectors::BaseConnector.class_name_for(connector_name) [:name] ||= "#{self.name}(vault)" [:is_vault] = true class_def 'vault' do unless @vault @vault = eval("::" + class_name).new() @vault.pipeline = self end @vault end end |
Instance Method Details
#allowed_in ⇒ Object
default allowed_in in case allow_in doesn’t get called
367 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 367 def allowed_in; nil; end |
#allowed_out ⇒ Object
default allowed_out in case allow_out doesn’t get called
389 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 389 def allowed_out; nil; end |
#association_context ⇒ Object
The context for all association keys used by this pipeline. By default, defer to the client
322 323 324 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 322 def association_context @client.association_context end |
#in_create(event) ⇒ Object
Override to restrict creation on the vault
198 199 200 201 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 198 def in_create event log.debug "Create allowed through default rule" true end |
#in_filter(event) ⇒ Object
Default method for allowed_in. Override by calling allow_in def allowed_in; false; end
371 372 373 374 375 376 377 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 371 def in_filter(event) if allowed_in event.drop_all_but_changes_to allowed_in else event end end |
#in_handler(event) ⇒ Object
Called by the ‘in’ connector in the ‘in’ thread to process events generated by the client.
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 284 def in_handler(event) event.retrieve_association(association_context) hint = " (#{client.name} => #{vault.name})" log.info "Processing incoming #{event.type} event"+hint log.info YAML.dump(event) perform_transform :in_map_schema, event, hint perform_transform :in_transform, event, hint perform_transform :in_filter, event, hint # The client can't really know whether its an add or a modify because it doesn't store # the association. if event.type == :modify event.convert_to_add unless event.associated? and vault.find_associated(event.association) elsif event.type == :add and event.associated? and vault.find_associated(event.association) event.convert_to_modify end if event.type == :add match = in_match(event) # exactly one event record in the vault matched if match event.merge(match) return end if in_create(event) perform_transform :in_place, event, hint else return end end with_rescue("#{vault.name}: Processing command") {vault.process(event)} end |
#in_map_schema(event) ⇒ Object
If client_to_vault_map is defined (usually by map_client_to_vault) then fix up the contents of the payload to refer to the fields by the names in the vault namespace
334 335 336 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 334 def in_map_schema event map_schema event, client_to_vault_map if respond_to? :client_to_vault_map end |
#in_match(event) ⇒ Object
326 327 328 329 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 326 def in_match event log.debug "Default match rule - source path exists in vault" vault.respond_to?('[]') and vault[event.source_path] end |
#in_place(event) ⇒ Object
Override to modify the target path for creation in the vault
210 211 212 213 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 210 def in_place(event) log.debug "Default placement rule target_path = source_path" event.target_path = event.source_path end |
#map_schema(event, map) ⇒ Object
342 343 344 345 346 347 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 342 def map_schema event, map return unless map and event.payload event.payload.each do |op| op.subject = map[op.subject] || op.subject if op.subject end end |
#name ⇒ Object
49 50 51 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 49 def name self.class.name end |
#out_create(event) ⇒ Object
Override to restrict creation on the client
192 193 194 195 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 192 def out_create event log.debug "Create allowed through default rule" true end |
#out_event_filter(event) ⇒ Object
Override to process the event generated by the publisher before any other processing is done. Return false to veto the event.
281 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 281 def out_event_filter(event); true; end |
#out_filter(event) ⇒ Object
Default method for allowed_out. Override by calling allow_in def allowed_out; false; end
393 394 395 396 397 398 399 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 393 def out_filter(event) if allowed_out event.drop_all_but_changes_to allowed_out else event end end |
#out_handler(event) ⇒ Object
Called by the identity-vault connector in the ‘out’ thread to process events generated by the identity vault.
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 124 def out_handler(event) event.retrieve_association(association_context) event.convert_to_modify if event.associated? and event.type == :add hint = " (#{vault.name} => #{client.name})" log.info "Processing out-going #{event.type} event #{hint}" log.info YAML.dump(event) return unless out_event_filter event # Remove unwanted attributes perform_transform :out_filter, event unless event.associated? if [:delete, :remove_association].include? event.type log.info "#{name}: No action for #{event.type} of unassociated entry" log.info YAML.dump(event) return end end if event.type == :modify unless event.associated? and client.has_entry_for_key?(event.association.key) event.convert_to_add end end if event.type == :add match = out_match(event) log.info "Attempting to match" if match # exactly one event record on the client matched log.info "Match found, merging" event.merge(match) association = Association.new(self.association_context, match.src_path) vault.associate asssociation, event.source_path return end log.info "No match found, creating" return unless out_create(event) perform_transform :out_place, event end perform_transform :out_map_schema, event perform_transform :out_transform, event association_key = nil with_rescue("#{client.name}: Processing command") do association_key = client.process(event) end if association_key association = Association.new(association_context, association_key) with_rescue("#{client.name}: Storing association #{association} in vault") do vault.associate(association, event.source_path) end end end |
#out_map_schema(event) ⇒ Object
338 339 340 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 338 def out_map_schema event map_schema event, vault_to_client_map if respond_to? :vault_to_client_map end |
#out_match(event) ⇒ Object
Override to implement some kind of matching
185 186 187 188 189 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 185 def out_match event log.debug "Default matching rule - source path exists on client?" client.respond_to?('[]') and client[event.source_path] false end |
#out_place(event) ⇒ Object
Override to modify the target path for creation on the client
204 205 206 207 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 204 def out_place(event) log.debug "Default placement rule target_path = source_path" event.target_path = event.source_path end |
#perform_transform(name, event, hint = "") ⇒ Object
215 216 217 218 219 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 215 def perform_transform name, event, hint="" call_if_exists name, event, hint event.commit_changes log_progress name, event, hint end |
#run_in_once ⇒ Object
Execute the in pipe once and then return
245 246 247 248 249 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 245 def run_in_once log.debug "Running #{name} 'in' pipeline once" client.once_only = true client.start {|event| in_handler(event)} end |
#run_once ⇒ Object
Execute the pipeline once then return.
226 227 228 229 230 231 232 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 226 def run_once log.info "Running #{name} pipeline once" started run_in_once run_out_once stopped end |
#run_out_once ⇒ Object
Execute the out pipe once and then return
252 253 254 255 256 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 252 def run_out_once log.debug "Running #{name} 'out' pipeline once" vault.once_only = true vault.start {|event| out_handler(event)} end |
#start ⇒ Object
258 259 260 261 262 263 264 265 266 267 268 269 270 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 258 def start log.info "Starting #{name} pipeline" @running = true trap("SIGINT") {self.stop} started while @running run_in_once run_out_once sleep delay end stopped log.info "#{name} stopped." end |
#started ⇒ Object
234 235 236 237 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 234 def started client.started vault.started end |
#stop ⇒ Object
272 273 274 275 276 277 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 272 def stop log.info "#{name} stopping..." @running = false Thread.main.run # i thought this would wake the thread from its sleep # but it seems to have no effect. end |
#stopped ⇒ Object
239 240 241 242 |
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 239 def stopped client.stopped vault.stopped end |