Class: RubySync::Pipelines::BasePipeline

Inherits:
Object
  • Object
show all
Includes:
Utilities
Defined in:
lib/ruby_sync/pipelines/base_pipeline.rb

Overview

This pipeline is the base for those that synchronize content bi-directionally between two datastores. These are commonly used for synchronizing identity information between directories.

One of the data-stores is called the identity-vault. This is generally the central repository for identity information (typically, an LDAP server or relational database). We’ll call the other data-store the client for want of a better term. This is could be anything that an EndPoint has been written for (an LDAP server, a text file, an application, etc).

We refer to the flow of events from the client to the identity-vault as incoming and those from the identity vault to the client as out-going. Methods in this class prefixed with ‘in_’ or ‘out_’ work on the incoming or outgoing flows respectively.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Utilities

#as_array, #call_if_exists, #connector_called, #effective_operations, #ensure_dir_exists, #get_preference, #get_preference_file_path, #include_in_search_path, #log_progress, #perform_operations, #pipeline_called, #set_preference, #something_called, #with_rescue

Constructor Details

#initializeBasePipeline

Returns a new instance of BasePipeline.



50
51
52
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 50

def initialize
  @delay = 5
end

Instance Attribute Details

#delayObject

delay in seconds between checking connectors



44
45
46
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 44

def delay
  @delay
end

Class Method Details

.allow_in(*fields) ⇒ Object

Specify which fields will be allowed through the incoming filter If nil (the default), all fields are allowed.



426
427
428
429
430
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 426

def self.allow_in *fields
  class_def 'allowed_in' do
    fields.map {|f| f.to_s}
  end
end

.allow_out(*fields) ⇒ Object

Specify which fields will be allowed through the incoming filter If nil (the default), all fields are allowed.



443
444
445
446
447
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 443

def self.allow_out *fields
  class_def 'allowed_out' do
    fields.map {|f| f.to_s }
  end
end

.client(connector_name, options = {}) ⇒ Object



58
59
60
61
62
63
64
65
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 58

def self.client(connector_name, options={})
  class_name = RubySync::Connectors::BaseConnector.class_name_for(connector_name)
  options[:name] ||= "#{self.name}(client)"
  options[:is_vault] = false
  class_def 'client' do
    @client ||= eval(class_name).new(options)
  end
end

.event_method(name, &blk) ⇒ Object



90
91
92
93
94
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 90

def self.event_method name,&blk
  define_method name do |event|
    event.instance_eval &blk
  end
end

.in_create(&blk) ⇒ Object



85
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 85

def self.in_create(&blk) event_method :in_create,&blk; end

.in_match(&blk) ⇒ Object



83
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 83

def self.in_match(&blk) event_method :in_match,&blk; end

.in_place(&blk) ⇒ Object



87
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 87

def self.in_place(&blk) event_method :in_place,&blk; end

.in_transform(&blk) ⇒ Object



81
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 81

def self.in_transform(&blk) event_method :in_transform,&blk; end

.out_create(&blk) ⇒ Object



86
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 86

def self.out_create(&blk) event_method :out_create,&blk; end

.out_match(&blk) ⇒ Object



84
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 84

def self.out_match(&blk) event_method :out_match,&blk; end

.out_place(&blk) ⇒ Object



88
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 88

def self.out_place(&blk) event_method :out_place,&blk; end

.out_transform(&blk) ⇒ Object



82
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 82

def self.out_transform(&blk) event_method :out_transform,&blk; end

.vault(connector_name, options = {}) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 67

def self.vault(connector_name, options={})
  class_name = RubySync::Connectors::BaseConnector.class_name_for(connector_name)
  options[:name] ||= "#{self.name}(vault)"
  options[:is_vault] = true
  class_def 'vault' do
    unless @vault
      @vault = eval(class_name).new(options)
      @vault.pipeline = self
    end
    @vault
  end
end

Instance Method Details

#allowed_inObject

default allowed_in in case allow_in doesn’t get called



433
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 433

def allowed_in; nil; end

#allowed_outObject

default allowed_out in case allow_out doesn’t get called



450
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 450

def allowed_out; nil; end

#association_contextObject

The context for all association keys used by this pipeline. By default, defer to the client



392
393
394
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 392

def association_context
  @client.association_context
end

#default_create(event) ⇒ Object Also known as: in_create, out_create

Override to restrict creation on the client



109
110
111
112
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 109

def default_create event
  log.debug "Create allowed through default rule"
  true
end

#default_place(event) ⇒ Object Also known as: in_place, out_place

Override to modify the target path for creation on the client



117
118
119
120
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 117

def default_place(event)
  log.debug "Default placement: same as source_path"
  event.source_path
end

#in_filter(event) ⇒ Object



436
437
438
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 436

def in_filter(event)
    allowed_in == [] or event.drop_all_but_changes_to(allowed_in || [])
end

#in_handler(event) ⇒ Object

Called by the ‘in’ connector in the ‘in’ thread to process events generated by the client. Note: The client can’t really know whether the event is an add or a modify because it doesn’t store the association.



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 204

def in_handler(event)
  event.target = @vault
  event.retrieve_association(association_context)

  log.info "Processing incoming #{event.type} event "+event.hint
  perform_transform :in_filter, event, event.hint

  associated_entry = nil
  unless event.type == :disassociate
    associated_entry = vault.find_associated(event.association) if event.associated?
    unless associated_entry
      match = in_match(event)
      if match
        log.info("Matching entry found for unassociated event: '#{match}'. Creating association.")
        event.association = Association.new(association_context, event.source_path)
        vault.associate event.association, match
        associated_entry = vault[match]
      end
    end          
  end
      
  if associated_entry
    if event.type == :add
  	  log.info "Associated entry in vault for add event. Converting to modify"
      event.convert_to_modify
    end
  elsif event.type == :modify
	    log.info "No associated entry in vault for modify event. Converting to add"
	    event.convert_to_add 
  end

  perform_transform :in_transform, event, event.hint

  case event.type
  when :add
    if in_create(event)
      perform_transform :in_place_transform, event, event.hint
	    log.info "Create on vault allowed. Placing at #{event.target_path}"
    else
	    log.info "Create rule disallowed creation"
      log.info "---\n"; return
    end
  when :modify
    event.merge(associated_entry)
  else
    unless event.associated?
	    log.info "No associated entry in vault for #{event.type} event. Dropping"
      log.info "---\n"; return
    end
  end

  with_rescue("#{vault.name}: Processing command") {vault.process(event)}
  log.info "---\n"
  
end

#in_map_schema(event) ⇒ Object

If client_to_vault_map is defined (usually by map_client_to_vault) then fix up the contents of the payload to refer to the fields by the names in the vault namespace



400
401
402
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 400

def in_map_schema event
  map_schema event, client_to_vault_map if respond_to? :client_to_vault_map
end

#in_match(event) ⇒ Object



96
97
98
99
100
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 96

def in_match(event)
  log.debug "Default matching rule - vault[in_place] exists?"
  path = in_place(event)
  vault.respond_to?('[]') and vault[path] and path
end

#in_place_transform(event) ⇒ Object



124
125
126
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 124

def in_place_transform(event)
  event.target_path = in_place(event)
end

#map_schema(event, map) ⇒ Object



408
409
410
411
412
413
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 408

def map_schema event, map
  return unless map and event.payload
  event.payload.each do |op|
    op.subject = map[op.subject] || op.subject if op.subject
  end
end

#nameObject



54
55
56
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 54

def name
  self.class.name
end

#out_event_filter(event) ⇒ Object

Override to process the event generated by the publisher before any other processing is done. Return false to veto the event.



199
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 199

def out_event_filter(event);  true;  end

#out_filter(event) ⇒ Object



452
453
454
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 452

def out_filter(event)
    allowed_out == [] or event.drop_all_but_changes_to(allowed_out || [])
end

#out_handler(event) ⇒ Object

Called by the ‘in’ connector in the ‘in’ thread to process events generated by the client. Note: The client can’t really know whether the event is an add or a modify because it doesn’t store the association.



263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 263

def out_handler(event)
  event.target = @client
  event.retrieve_association(association_context)

  log.info "Processing outgoing #{event.type} event "+ event.hint
  perform_transform :out_filter, event, event.hint

  associated_entry = nil
  unless event.type == :disassociate
    associated_entry = client.entry_for_own_association_key(event.association.key) if event.associated?
    unless associated_entry
      match = out_match(event)
      if match
        log.info("Matching entry found for unassociated event: '#{match}'. Creating association.")
        event.association = Association.new(association_context, match)
        vault.associate event.association, event.source_path
        associated_entry = client[match]
      end
    end          
  end
      
  if associated_entry
    if event.type == :add
  	  log.info "Associated entry in client for add event. Converting to modify"
      event.convert_to_modify
    end
  elsif event.type == :modify
	    log.info "No associated entry in client for modify event. Converting to add"
	    event.convert_to_add 
  end

  perform_transform :out_transform, event, event.hint

  case event.type
  when :add
    if out_create(event)
      perform_transform :out_place_transform, event, event.hint
	    log.info "Create on client allowed. Placing at #{event.target_path}"
    else
	    log.info "Create rule disallowed creation"
      log.info "---\n"; return
    end
  when :modify
    event.merge(associated_entry)
  else
    unless event.associated?
	    log.info "No associated entry in client for #{event.type} event. Dropping"
      log.info "---\n"; return
    end
  end

  with_rescue("#{client.name}: Processing command") {client.process(event)}
  log.info "---\n"
  
end

#out_map_schema(event) ⇒ Object



404
405
406
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 404

def out_map_schema event
  map_schema event, vault_to_client_map if respond_to? :vault_to_client_map
end

#out_match(event) ⇒ Object



102
103
104
105
106
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 102

def out_match(event)
  log.debug "Default matching rule - client[out_place] exists?"
  path = out_place(event)
  client.respond_to?('[]') and client[path] and path
end

#out_place_transform(event) ⇒ Object



128
129
130
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 128

def out_place_transform(event)
  event.target_path = out_place(event)
end

#perform_transform(name, event, hint = "") ⇒ Object



132
133
134
135
136
137
138
139
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 132

def perform_transform name, event, hint=""
 log.info "Performing #{name}"
 log.info event.to_yaml if dump_before.include?(name.to_sym)
  call_if_exists name, event, hint
  event.commit_changes
 log.info event.to_yaml if dump_after.include?(name.to_sym)
  #log_progress name, event, hint
end

#run_in_onceObject

Execute the in pipe once and then return



161
162
163
164
165
166
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 161

def run_in_once
  return unless allowed_in
  log.debug "Running #{name} 'in' pipeline once"
  client.once_only = true
  client.start {|event| in_handler(event)}
end

#run_onceObject

Execute the pipeline once then return.



142
143
144
145
146
147
148
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 142

def run_once
  log.info "Running #{name} pipeline once"
  started
  run_in_once
  run_out_once
  stopped
end

#run_out_onceObject

Execute the out pipe once and then return



169
170
171
172
173
174
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 169

def run_out_once
  return unless allowed_out
  log.debug "Running #{name} 'out' pipeline once"
  vault.once_only = true
  vault.start {|event| out_handler(event)}
end

#startObject



176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 176

def start
  log.info "Starting #{name} pipeline"
  @running = true
  trap("SIGINT") {self.stop}
  started
  while @running
    run_in_once
    run_out_once
    sleep delay
  end
  stopped
  log.info "#{name} stopped."
end

#startedObject



150
151
152
153
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 150

def started
  client.started
  vault.started
end

#stopObject



190
191
192
193
194
195
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 190

def stop
  log.info "#{name} stopping..."
  @running = false
  Thread.main.run # i thought this would wake the thread from its sleep
  # but it seems to have no effect.
end

#stoppedObject



155
156
157
158
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 155

def stopped
  client.stopped
  vault.stopped
end