Class: RubySync::Pipelines::BasePipeline

Inherits:
Object
  • Object
show all
Includes:
Utilities
Defined in:
lib/ruby_sync/pipelines/base_pipeline.rb

Overview

This pipeline is the base for those that synchronize content bi-directionally between two datastores. These are commonly used for synchronizing identity information between directories.

One of the data-stores is called the identity-vault. This is generally the central repository for identity information (typically, an LDAP server or relational database). We’ll call the other data-store the client for want of a better term. This is could be anything that an EndPoint has been written for (an LDAP server, a text file, an application, etc).

We refer to the flow of events from the client to the identity-vault as incoming and those from the identity vault to the client as out-going. Methods in this class prefixed with ‘in_’ or ‘out_’ work on the incoming or outgoing flows respectively.

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Utilities

#base_path, #call_if_exists, #connector_called, #ensure_dir_exists, #find_base_path, #get_preference, #get_preference_file_path, #include_in_search_path, #log_progress, #pipeline_called, #set_preference, #something_called, #with_rescue

Constructor Details

#initializeBasePipeline

Returns a new instance of BasePipeline.



43
44
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 43

def initialize
end

Class Method Details

.allow_in(*fields) ⇒ Object

Specify which fields will be allowed through the incoming filter If nil (the default), all fields are allowed.



322
323
324
325
326
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 322

def self.allow_in *fields
  class_def 'allowed_in' do
    fields.map {|f| f.to_s}
  end
end

.allow_out(*fields) ⇒ Object

Specify which fields will be allowed through the incoming filter If nil (the default), all fields are allowed.



344
345
346
347
348
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 344

def self.allow_out *fields
  class_def 'allowed_out' do
    fields.map {|f| f.to_s }
  end
end

.client(connector_name, options = {}) ⇒ Object



50
51
52
53
54
55
56
57
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 50

def self.client(connector_name, options={})
  class_name = RubySync::Connectors::BaseConnector.class_name_for(connector_name)
  options[:name] ||= "#{self.name}(client)"
  options[:is_vault] = false
  class_def 'client' do
    @client ||= eval("::#{class_name}").new(options)
  end
end

.in_transform(&blk) ⇒ Object



107
108
109
110
111
112
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 107

def self.in_transform &blk
  define_method :in_transform do |event|
    event.meta_def :transform, &blk
    event.transform
  end
end

.map_client_to_vault(mappings) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 68

def self.map_client_to_vault mappings
  remove_method :client_to_vault_map if method_defined? :client_to_vault_map
  class_def 'client_to_vault_map' do
    unless @client_to_vault_map
      @client_to_vault_map = {}
      mappings.each {|k,v| @client_to_vault_map[k.to_s] = v.to_s}
    end
    @client_to_vault_map
  end
  unless method_defined? :vault_to_client_map
    class_def 'vault_to_client_map' do
      @vault_to_client_map ||= client_to_vault_map.invert
    end
  end
end

.map_vault_to_client(mappings) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 84

def self.map_vault_to_client mappings
  remove_method :vault_to_client_map if method_defined? :vault_to_client_map
  class_def 'vault_to_client_map' do
    unless @vault_to_client_map
      @vault_to_client_map = {}
      mappings.each {|k,v| @vault_to_client_map[k.to_s] = v.to_s}
    end
    @vault_to_client_map
  end
  unless method_defined? :client_to_vault_map
    class_def 'client_to_vault_map' do
      @client_to_vault_map ||= vault_to_client_map.invert
    end
  end
end

.out_transform(&blk) ⇒ Object



100
101
102
103
104
105
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 100

def self.out_transform &blk
  define_method :out_transform do |event|
    event.meta_def :transform, &blk
    event.transform
  end
end

.vault(connector_name, options = {}) ⇒ Object



59
60
61
62
63
64
65
66
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 59

def self.vault(connector_name, options={})
  class_name = RubySync::Connectors::BaseConnector.class_name_for(connector_name)
  options[:name] ||= "#{self.name}(vault)"
  options[:is_vault] = true
  class_def 'vault' do
    @vault ||= eval("::" + class_name).new(options)
  end
end

Instance Method Details

#allowed_inObject

default allowed_in in case allow_in doesn’t get called



329
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 329

def allowed_in; nil; end

#allowed_outObject

default allowed_out in case allow_out doesn’t get called



351
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 351

def allowed_out; nil; end

#association_contextObject

The context for all association keys used by this pipeline. By default, defer to the client



284
285
286
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 284

def association_context
  @client.association_context
end

#in_create(event) ⇒ Object

Override to restrict creation on the vault



191
192
193
194
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 191

def in_create event
  log.debug "Create allowed through default rule"
  true
end

#in_filter(event) ⇒ Object

Default method for allowed_in. Override by calling allow_in def allowed_in; false; end



333
334
335
336
337
338
339
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 333

def in_filter(event)
  if allowed_in
    event.drop_all_but_changes_to allowed_in
  else
    event
  end
end

#in_handler(event) ⇒ Object

Called by the ‘in’ connector in the ‘in’ thread to process events generated by the client.



246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 246

def in_handler(event)
  event.retrieve_association(association_context)        

  hint = " (#{client.name} => #{vault.name})"
  log.info "Processing incoming #{event.type} event"+hint
  log.info YAML.dump(event)
  perform_transform :in_map_schema, event, hint
  perform_transform :in_transform, event, hint
  perform_transform :in_filter, event, hint
  
  # The client can't really know whether its an add or a modify because it doesn't store
  # the association.
  if event.type == :modify
    event.convert_to_add unless event.associated? and vault.find_associated(event.association)
  elsif event.type == :add and event.associated? and vault.find_associated(event.association)
    event.convert_to_modify
  end
  
  if event.type == :add
    match = in_match(event) # exactly one event record in the vault matched
    if match
      event.merge(match)
      return
    end
    
    if in_create(event)
      perform_transform :in_place, event, hint
    else
      return
    end
  end
  
  with_rescue("#{vault.name}: Processing command") {vault.process(event)}
  
end

#in_map_schema(event) ⇒ Object

If client_to_vault_map is defined (usually by map_client_to_vault) then fix up the contents of the payload to refer to the fields by the names in the vault namespace



296
297
298
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 296

def in_map_schema event
  map_schema event, client_to_vault_map if respond_to? :client_to_vault_map
end

#in_match(event) ⇒ Object



288
289
290
291
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 288

def in_match event
  log.debug "Default match rule - source path exists in vault"
  vault.respond_to?(:'[]') and vault[event.source_path]
end

#in_place(event) ⇒ Object

Override to modify the target path for creation in the vault



203
204
205
206
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 203

def in_place(event)
  log.debug "Default placement rule target_path = source_path"
  event.target_path = event.source_path
end

#map_schema(event, map) ⇒ Object



304
305
306
307
308
309
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 304

def map_schema event, map
  return unless map and event.payload
  event.payload.each do |op|
    op.subject = map[op.subject] || op.subject if op.subject
  end
end

#nameObject



46
47
48
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 46

def name
  self.class.name
end

#out_create(event) ⇒ Object

Override to restrict creation on the client



185
186
187
188
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 185

def out_create event
  log.debug "Create allowed through default rule"
  true
end

#out_event_filter(event) ⇒ Object

Override to process the event generated by the publisher before any other processing is done. Return false to veto the event.



243
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 243

def out_event_filter(event);  true;  end

#out_filter(event) ⇒ Object

Default method for allowed_out. Override by calling allow_in def allowed_out; false; end



355
356
357
358
359
360
361
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 355

def out_filter(event)
  if allowed_out
    event.drop_all_but_changes_to allowed_out
  else
    event
  end
end

#out_handler(event) ⇒ Object

Called by the identity-vault connector in the ‘out’ thread to process events generated by the identity vault.



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 117

def out_handler(event)

  event.retrieve_association(association_context)
  event.convert_to_modify if event.associated? and event.type == :add
  
  hint = " (#{vault.name} => #{client.name})"
  log.info "Processing out-going #{event.type} event #{hint}"
  log.info YAML.dump(event)
  return unless out_event_filter event
  
  # Remove unwanted attributes
  perform_transform :out_filter, event

  unless event.associated?
    if [:delete, :remove_association].include? event.type
      log.info "#{name}: No action for #{event.type} of unassociated entry"
      log.info YAML.dump(event)
      return
    end
  end

  if event.type == :modify
    unless event.associated? and client.has_entry_for_key?(event.association.key)
      event.convert_to_add
    end
  end

  if event.type == :add
    match = out_match(event) 
    log.info "Attempting to match"
    if match # exactly one event record on the client matched
      log.info "Match found, merging"
      event.merge(match)
      association = Association.new(self.association_context, match.src_path)
      vault.associate asssociation, event.source_path
      return
    end
    log.info "No match found, creating"
    return unless out_create(event)
    perform_transform :out_place, event
  end
  
  perform_transform :out_map_schema, event
  perform_transform :out_transform, event
  association_key = nil
  with_rescue("#{client.name}: Processing command") do
    association_key = client.process(event)
  end
  if association_key
    association = Association.new(association_context, association_key)
    with_rescue("#{client.name}: Storing association #{association} in vault") do
      vault.associate(association, event.source_path)
    end
  end
end

#out_map_schema(event) ⇒ Object



300
301
302
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 300

def out_map_schema event
  map_schema event, vault_to_client_map if respond_to? :vault_to_client_map
end

#out_match(event) ⇒ Object

Override to implement some kind of matching



178
179
180
181
182
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 178

def out_match event
  log.debug "Default matching rule - source path exists on client?"
  client.respond_to?(:'[]') and client[event.source_path]
  false
end

#out_place(event) ⇒ Object

Override to modify the target path for creation on the client



197
198
199
200
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 197

def out_place(event)
  log.debug "Default placement rule target_path = source_path"
  event.target_path = event.source_path
end

#perform_transform(name, event, hint = "") ⇒ Object



208
209
210
211
212
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 208

def perform_transform name, event, hint=""
  call_if_exists name, event, hint
  event.commit_changes
  log_progress name, event, hint
end

#run_in_onceObject

Execute the in pipe once and then return



227
228
229
230
231
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 227

def run_in_once
  log.info "Running #{name} 'in' pipeline once"
  client.once_only = true
  client.start {|event| in_handler(event)}
end

#run_onceObject

Execute the pipeline once then return. TODO Consider making this run in and out simultaneously



220
221
222
223
224
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 220

def run_once
  log.info "Running #{name} pipeline once"
  run_in_once
  run_out_once
end

#run_out_onceObject

Execute the out pipe once and then return



234
235
236
237
238
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 234

def run_out_once
  log.info "Running #{name} 'out' pipeline once"
  vault.once_only = true
  vault.start {|event| out_handler(event)}
end