Class: RubySync::Pipelines::BasePipeline

Inherits:
Object
  • Object
show all
Includes:
Utilities
Defined in:
lib/ruby_sync/pipelines/base_pipeline.rb

Overview

This pipeline is the base for those that synchronize content bi-directionally between two datastores. These are commonly used for synchronizing identity information between directories.

One of the data-stores is called the identity-vault. This is generally the central repository for identity information (typically, an LDAP server or relational database). We’ll call the other data-store the client for want of a better term. This is could be anything that an EndPoint has been written for (an LDAP server, a text file, an application, etc).

We refer to the flow of events from the client to the identity-vault as incoming and those from the identity vault to the client as out-going. Methods in this class prefixed with ‘in_’ or ‘out_’ work on the incoming or outgoing flows respectively.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Utilities

#as_array, #base_path, #call_if_exists, #connector_called, #effective_operations, #ensure_dir_exists, #find_base_path, #get_preference, #get_preference_file_path, #include_in_search_path, #log_progress, #perform_operations, #pipeline_called, #set_preference, #something_called, #with_rescue

Constructor Details

#initializeBasePipeline

Returns a new instance of BasePipeline.



45
46
47
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 45

def initialize
  @delay = 5
end

Instance Attribute Details

#delayObject

delay in seconds between checking connectors



43
44
45
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 43

def delay
  @delay
end

Class Method Details

.allow_in(*fields) ⇒ Object

Specify which fields will be allowed through the incoming filter If nil (the default), all fields are allowed.



360
361
362
363
364
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 360

def self.allow_in *fields
  class_def 'allowed_in' do
    fields.map {|f| f.to_s}
  end
end

.allow_out(*fields) ⇒ Object

Specify which fields will be allowed through the incoming filter If nil (the default), all fields are allowed.



382
383
384
385
386
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 382

def self.allow_out *fields
  class_def 'allowed_out' do
    fields.map {|f| f.to_s }
  end
end

.client(connector_name, options = {}) ⇒ Object



53
54
55
56
57
58
59
60
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 53

def self.client(connector_name, options={})
  class_name = RubySync::Connectors::BaseConnector.class_name_for(connector_name)
  options[:name] ||= "#{self.name}(client)"
  options[:is_vault] = false
  class_def 'client' do
    @client ||= eval("::#{class_name}").new(options)
  end
end

.in_transform(&blk) ⇒ Object



114
115
116
117
118
119
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 114

def self.in_transform &blk
  define_method :in_transform do |event|
    event.meta_def :transform, &blk
    event.transform
  end
end

.map_client_to_vault(mappings) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 75

def self.map_client_to_vault mappings
  remove_method :client_to_vault_map if method_defined? :client_to_vault_map
  class_def 'client_to_vault_map' do
    unless @client_to_vault_map
      @client_to_vault_map = {}
      mappings.each {|k,v| @client_to_vault_map[k.to_s] = v.to_s}
    end
    @client_to_vault_map
  end
  unless method_defined? :vault_to_client_map
    class_def 'vault_to_client_map' do
      @vault_to_client_map ||= client_to_vault_map.invert
    end
  end
end

.map_vault_to_client(mappings) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 91

def self.map_vault_to_client mappings
  remove_method :vault_to_client_map if method_defined? :vault_to_client_map
  class_def 'vault_to_client_map' do
    unless @vault_to_client_map
      @vault_to_client_map = {}
      mappings.each {|k,v| @vault_to_client_map[k.to_s] = v.to_s}
    end
    @vault_to_client_map
  end
  unless method_defined? :client_to_vault_map
    class_def 'client_to_vault_map' do
      @client_to_vault_map ||= vault_to_client_map.invert
    end
  end
end

.out_transform(&blk) ⇒ Object



107
108
109
110
111
112
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 107

def self.out_transform &blk
  define_method :out_transform do |event|
    event.meta_def :transform, &blk
    event.transform
  end
end

.vault(connector_name, options = {}) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 62

def self.vault(connector_name, options={})
  class_name = RubySync::Connectors::BaseConnector.class_name_for(connector_name)
  options[:name] ||= "#{self.name}(vault)"
  options[:is_vault] = true
  class_def 'vault' do
    unless @vault
      @vault = eval("::" + class_name).new(options)
      @vault.pipeline = self
    end
    @vault
  end
end

Instance Method Details

#allowed_inObject

default allowed_in in case allow_in doesn’t get called



367
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 367

def allowed_in; nil; end

#allowed_outObject

default allowed_out in case allow_out doesn’t get called



389
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 389

def allowed_out; nil; end

#association_contextObject

The context for all association keys used by this pipeline. By default, defer to the client



322
323
324
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 322

def association_context
  @client.association_context
end

#in_create(event) ⇒ Object

Override to restrict creation on the vault



198
199
200
201
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 198

def in_create event
  log.debug "Create allowed through default rule"
  true
end

#in_filter(event) ⇒ Object

Default method for allowed_in. Override by calling allow_in def allowed_in; false; end



371
372
373
374
375
376
377
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 371

def in_filter(event)
  if allowed_in
    event.drop_all_but_changes_to allowed_in
  else
    event
  end
end

#in_handler(event) ⇒ Object

Called by the ‘in’ connector in the ‘in’ thread to process events generated by the client.



284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 284

def in_handler(event)
  event.retrieve_association(association_context)        

  hint = " (#{client.name} => #{vault.name})"
  log.info "Processing incoming #{event.type} event"+hint
  log.info YAML.dump(event)
  perform_transform :in_map_schema, event, hint
  perform_transform :in_transform, event, hint
  perform_transform :in_filter, event, hint
  
  # The client can't really know whether its an add or a modify because it doesn't store
  # the association.
  if event.type == :modify
    event.convert_to_add unless event.associated? and vault.find_associated(event.association)
  elsif event.type == :add and event.associated? and vault.find_associated(event.association)
    event.convert_to_modify
  end
  
  if event.type == :add
    match = in_match(event) # exactly one event record in the vault matched
    if match
      event.merge(match)
      return
    end
    
    if in_create(event)
      perform_transform :in_place, event, hint
    else
      return
    end
  end
  
  with_rescue("#{vault.name}: Processing command") {vault.process(event)}
  
end

#in_map_schema(event) ⇒ Object

If client_to_vault_map is defined (usually by map_client_to_vault) then fix up the contents of the payload to refer to the fields by the names in the vault namespace



334
335
336
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 334

def in_map_schema event
  map_schema event, client_to_vault_map if respond_to? :client_to_vault_map
end

#in_match(event) ⇒ Object



326
327
328
329
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 326

def in_match event
  log.debug "Default match rule - source path exists in vault"
  vault.respond_to?('[]') and vault[event.source_path]
end

#in_place(event) ⇒ Object

Override to modify the target path for creation in the vault



210
211
212
213
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 210

def in_place(event)
  log.debug "Default placement rule target_path = source_path"
  event.target_path = event.source_path
end

#map_schema(event, map) ⇒ Object



342
343
344
345
346
347
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 342

def map_schema event, map
  return unless map and event.payload
  event.payload.each do |op|
    op.subject = map[op.subject] || op.subject if op.subject
  end
end

#nameObject



49
50
51
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 49

def name
  self.class.name
end

#out_create(event) ⇒ Object

Override to restrict creation on the client



192
193
194
195
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 192

def out_create event
  log.debug "Create allowed through default rule"
  true
end

#out_event_filter(event) ⇒ Object

Override to process the event generated by the publisher before any other processing is done. Return false to veto the event.



281
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 281

def out_event_filter(event);  true;  end

#out_filter(event) ⇒ Object

Default method for allowed_out. Override by calling allow_in def allowed_out; false; end



393
394
395
396
397
398
399
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 393

def out_filter(event)
  if allowed_out
    event.drop_all_but_changes_to allowed_out
  else
    event
  end
end

#out_handler(event) ⇒ Object

Called by the identity-vault connector in the ‘out’ thread to process events generated by the identity vault.



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 124

def out_handler(event)

  event.retrieve_association(association_context)
  event.convert_to_modify if event.associated? and event.type == :add
  
  hint = " (#{vault.name} => #{client.name})"
  log.info "Processing out-going #{event.type} event #{hint}"
  log.info YAML.dump(event)
  return unless out_event_filter event
  
  # Remove unwanted attributes
  perform_transform :out_filter, event

  unless event.associated?
    if [:delete, :remove_association].include? event.type
      log.info "#{name}: No action for #{event.type} of unassociated entry"
      log.info YAML.dump(event)
      return
    end
  end

  if event.type == :modify
    unless event.associated? and client.has_entry_for_key?(event.association.key)
      event.convert_to_add
    end
  end

  if event.type == :add
    match = out_match(event) 
    log.info "Attempting to match"
    if match # exactly one event record on the client matched
      log.info "Match found, merging"
      event.merge(match)
      association = Association.new(self.association_context, match.src_path)
      vault.associate asssociation, event.source_path
      return
    end
    log.info "No match found, creating"
    return unless out_create(event)
    perform_transform :out_place, event
  end
  
  perform_transform :out_map_schema, event
  perform_transform :out_transform, event
  association_key = nil
  with_rescue("#{client.name}: Processing command") do
    association_key = client.process(event)
  end
  if association_key
    association = Association.new(association_context, association_key)
    with_rescue("#{client.name}: Storing association #{association} in vault") do
      vault.associate(association, event.source_path)
    end
  end
end

#out_map_schema(event) ⇒ Object



338
339
340
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 338

def out_map_schema event
  map_schema event, vault_to_client_map if respond_to? :vault_to_client_map
end

#out_match(event) ⇒ Object

Override to implement some kind of matching



185
186
187
188
189
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 185

def out_match event
  log.debug "Default matching rule - source path exists on client?"
  client.respond_to?('[]') and client[event.source_path]
  false
end

#out_place(event) ⇒ Object

Override to modify the target path for creation on the client



204
205
206
207
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 204

def out_place(event)
  log.debug "Default placement rule target_path = source_path"
  event.target_path = event.source_path
end

#perform_transform(name, event, hint = "") ⇒ Object



215
216
217
218
219
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 215

def perform_transform name, event, hint=""
  call_if_exists name, event, hint
  event.commit_changes
  log_progress name, event, hint
end

#run_in_onceObject

Execute the in pipe once and then return



245
246
247
248
249
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 245

def run_in_once
  log.debug "Running #{name} 'in' pipeline once"
  client.once_only = true
  client.start {|event| in_handler(event)}
end

#run_onceObject

Execute the pipeline once then return.



226
227
228
229
230
231
232
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 226

def run_once
  log.info "Running #{name} pipeline once"
  started
  run_in_once
  run_out_once
  stopped
end

#run_out_onceObject

Execute the out pipe once and then return



252
253
254
255
256
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 252

def run_out_once
  log.debug "Running #{name} 'out' pipeline once"
  vault.once_only = true
  vault.start {|event| out_handler(event)}
end

#startObject



258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 258

def start
  log.info "Starting #{name} pipeline"
  @running = true
  trap("SIGINT") {self.stop}
  started
  while @running
    run_in_once
    run_out_once
    sleep delay
  end
  stopped
  log.info "#{name} stopped."
end

#startedObject



234
235
236
237
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 234

def started
  client.started
  vault.started
end

#stopObject



272
273
274
275
276
277
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 272

def stop
  log.info "#{name} stopping..."
  @running = false
  Thread.main.run # i thought this would wake the thread from its sleep
  # but it seems to have no effect.
end

#stoppedObject



239
240
241
242
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 239

def stopped
  client.stopped
  vault.stopped
end