Class: RubySync::Pipelines::BasePipeline

Inherits:
Object
  • Object
show all
Includes:
Utilities
Defined in:
lib/ruby_sync/pipelines/base_pipeline.rb

Overview

This pipeline is the base for those that synchronize content bi-directionally between two datastores. These are commonly used for synchronizing identity information between directories.

One of the data-stores is called the identity-vault. This is generally the central repository for identity information (typically, an LDAP server or relational database). We’ll call the other data-store the client for want of a better term. This is could be anything that an EndPoint has been written for (an LDAP server, a text file, an application, etc).

We refer to the flow of events from the client to the identity-vault as incoming and those from the identity vault to the client as out-going. Methods in this class prefixed with ‘in_’ or ‘out_’ work on the incoming or outgoing flows respectively.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Utilities

#as_array, #call_if_exists, #class_called, #class_for_name, #class_name_for, #connector_called, #dump_after, #dump_before, #effective_operations, #ensure_dir_exists, #get_preference, #get_preference_file_path, #include_in_search_path, #log_progress, #perform_operations, #perform_transform, #pipeline_called, #set_preference, #something_called, #with_rescue

Constructor Details

#initializeBasePipeline

Returns a new instance of BasePipeline.



49
50
51
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 49

def initialize
  @delay = 5
end

Instance Attribute Details

#delayObject

add_dump_options



46
47
48
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 46

def delay
  @delay
end

Class Method Details

.allow_in(*fields) ⇒ Object

Specify which fields will be allowed through the incoming filter If nil (the default), all fields are allowed.



454
455
456
457
458
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 454

def self.allow_in *fields
  class_def 'allowed_in' do
    fields.map {|f| f.to_s}
  end
end

.allow_out(*fields) ⇒ Object

Specify which fields will be allowed through the incoming filter If nil (the default), all fields are allowed.



471
472
473
474
475
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 471

def self.allow_out *fields
  class_def 'allowed_out' do
    fields.map {|f| f.to_s }
  end
end

.client(connector_name, options = {}) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 57

def self.client(connector_name, options={})
	options = HashWithIndifferentAccess.new(options)
  connector_class = class_called(connector_name, "connector")
	unless connector_class
	  log.error "No connector called #connector_name}"
	  return
  end
  options[:name] ||= "#{self.name}(client)"
  options[:is_vault] = false
  class_def 'client' do
    @client ||= connector_class.new(options)
  end
end

.deprecated_event_method(name, replacement, &blk) ⇒ Object



112
113
114
115
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 112

def self.deprecated_event_method name, replacement, &blk
	log.warn "'#{name}' has been deprecated. Use '#{replacement}' instead."
	event_method(replacement, &blk)
end

.event_method(name, &blk) ⇒ Object



106
107
108
109
110
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 106

def self.event_method name,&blk
  define_method name do |event|
    event.instance_eval(&blk)
  end
end

.in_command_transform(&blk) ⇒ Object



95
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 95

def self.in_command_transform(&blk) event_method :in_command_transform,&blk; end

.in_create(&blk) ⇒ Object



101
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 101

def self.in_create(&blk) event_method :in_create,&blk; end

.in_event_transform(&blk) ⇒ Object



94
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 94

def self.in_event_transform(&blk) event_method :in_event_transform,&blk; end

.in_match(&blk) ⇒ Object



99
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 99

def self.in_match(&blk) event_method :in_match,&blk; end

.in_place(&blk) ⇒ Object



103
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 103

def self.in_place(&blk) event_method :in_place,&blk; end

.in_transform(&blk) ⇒ Object



93
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 93

def self.in_transform(&blk) deprecated_event_method :in_transform, :in_event_transform, &blk; end

.out_command_transform(&blk) ⇒ Object



98
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 98

def self.out_command_transform(&blk) event_method :out_command_transform,&blk; end

.out_create(&blk) ⇒ Object



102
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 102

def self.out_create(&blk) event_method :out_create,&blk; end

.out_event_transform(&blk) ⇒ Object



97
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 97

def self.out_event_transform(&blk) event_method :out_event_transform,&blk; end

.out_match(&blk) ⇒ Object



100
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 100

def self.out_match(&blk) event_method :out_match,&blk; end

.out_place(&blk) ⇒ Object



104
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 104

def self.out_place(&blk) event_method :out_place,&blk; end

.out_transform(&blk) ⇒ Object



96
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 96

def self.out_transform(&blk) deprecated_event_method :out_transform, :out_event_transform, &blk; end

.vault(connector_name, options = {}) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 71

def self.vault(connector_name, options={})
	options = HashWithIndifferentAccess.new(options)
  connector_class = class_called(connector_name, "connector")
	unless connector_class
	  log.error "No connector called #{connector_name}"
	  return
  end
  options[:name] ||= "#{self.name}(vault)"
  options[:is_vault] = true
  class_def 'vault' do
    unless @vault
      @vault = connector_class.new(options)
      @vault.pipeline = self
    end
    @vault
  end
end

Instance Method Details

#allowed_inObject

default allowed_in in case allow_in doesn’t get called



461
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 461

def allowed_in; nil; end

#allowed_outObject

default allowed_out in case allow_out doesn’t get called



478
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 478

def allowed_out; nil; end

#association_contextObject

The context for all association keys used by this pipeline. By default, defer to the client



420
421
422
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 420

def association_context
  @client.association_context
end

#default_create(event) ⇒ Object Also known as: in_create, out_create

Override to restrict creation on the client



139
140
141
142
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 139

def default_create event
  log.debug "Create allowed through default rule"
  true
end

#default_place(event) ⇒ Object Also known as: in_place, out_place

Override to modify the target path for creation on the client



147
148
149
150
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 147

def default_place(event)
  log.debug "Default placement: same as source_path"
  event.source_path
end

#in_filter(event) ⇒ Object



464
465
466
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 464

def in_filter(event)
	allowed_in == [] or event.drop_all_but_changes_to(allowed_in || [])
end

#in_handler(event) ⇒ Object

Called by the ‘in’ connector in the ‘in’ thread to process events generated by the client. Note: The client can’t really know whether the event is an add or a modify because it doesn’t store the association.



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 225

def in_handler(event)
  event.target = @vault
  event.retrieve_association(association_context)

  log.info "Processing incoming #{event.type} event "+event.hint
  perform_transform :in_filter, event, event.hint

  perform_transform :in_event_transform, event, event.hint
      
  associated_entry = nil
  unless event.type == :disassociate
    associated_entry = vault.find_associated(event.association) if event.associated?
    unless associated_entry
      match = in_match(event)
      if match
        log.info("Matching entry found for unassociated event: '#{match}'. Creating association.")
        event.association = Association.new(association_context, event.source_path)
        vault.associate event.association, match
        associated_entry = vault[match]
      else
        log.info "No match found for unassociated entry."
      end
    end          
  end

  if associated_entry
    if event.type == :add
	    log.info "Associated entry in vault for add event. Converting to modify"
      event.convert_to_modify associated_entry, allowed_in
    end
  elsif event.type == :modify
	  log.info "No associated entry in vault for modify event. Converting to add"
	  event.convert_to_add 
  end

  case event.type
  when :add
    if in_create(event)
      perform_transform :in_place_transform, event, event.hint
	    log.info "Create on vault allowed. Placing at #{event.target_path}"
    else
	    log.info "Create rule disallowed creation"
      log.info "---\n"; return
    end
  when :modify
    event.merge(associated_entry)
  else
    unless event.associated?
	    log.info "No associated entry in vault for #{event.type} event. Dropping"
      log.info "---\n"; return
    end
  end

	perform_transform :in_command_transform, event, event.hint

	if event.effective_operation?
	  with_rescue("#{vault.name}: Processing command") {vault.process(event)}
	else
	  log.info "No change."
	end
  log.info "---\n"
  
end

#in_map_schema(event) ⇒ Object

If client_to_vault_map is defined (usually by map_client_to_vault) then fix up the contents of the payload to refer to the fields by the names in the vault namespace



428
429
430
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 428

def in_map_schema event
  map_schema event, client_to_vault_map if respond_to? :client_to_vault_map
end

#in_match(event) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 118

def in_match(event)
  log.debug "Default matching rule - vault[in_place] exists?"
	if vault.respond_to?('[]')
    path = in_place(event) 
	  if path
	    log.debug "Checking for object at '#{path}' on vault."
	    vault[path] and path
	  end
	else
	  log.debug "Vault doesn't support random access - no match"
	  nil
	end
end

#in_place_transform(event) ⇒ Object



154
155
156
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 154

def in_place_transform(event)
  event.target_path = in_place(event)
end

#map_schema(event, map) ⇒ Object



436
437
438
439
440
441
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 436

def map_schema event, map
  return unless map and event.payload
  event.payload.each do |op|
    op.subject = map[op.subject] || op.subject if op.subject
  end
end

#nameObject



53
54
55
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 53

def name
  self.class.name
end

#out_event_filter(event) ⇒ Object

Override to process the event generated by the publisher before any other processing is done. Return false to veto the event.



220
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 220

def out_event_filter(event);  true;  end

#out_filter(event) ⇒ Object



480
481
482
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 480

def out_filter(event)
	allowed_out == [] or event.drop_all_but_changes_to(allowed_out || [])
end

#out_handler(event) ⇒ Object

Called by the ‘vault’ connector in the ‘out’ thread to process events generated by the vault.



290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 290

def out_handler(event)
  event.target = @client
  event.retrieve_association(association_context)

  log.info "Processing outgoing #{event.type} event "+ event.hint
  perform_transform :out_filter, event, event.hint
  perform_transform :out_event_transform, event, event.hint

  associated_entry = nil
  unless event.type == :disassociate
    associated_entry = client.entry_for_own_association_key(event.association.key) if event.associated?
    unless associated_entry
      match = out_match(event)
      if match
        log.info("Matching entry found for unassociated event: '#{match}'. Creating association.")
        event.association = Association.new(association_context, match)
        vault.associate event.association, event.source_path
        associated_entry = client[match]
      end
    end          
  end
      
  if associated_entry
    if event.type == :add
	    log.info "Associated entry in client for add event. Converting to modify"
      event.convert_to_modify(associated_entry,allowed_out)
    end
  elsif event.type == :modify
	  log.info "No associated entry in client for modify event. Converting to add"
	  event.convert_to_add 
  end

  case event.type
  when :add
    if out_create(event)
      perform_transform :out_place_transform, event, event.hint
	    log.info "Create on client allowed. Placing at #{event.target_path}"
    else
	    log.info "Create rule disallowed creation"
      log.info "---\n"; return
    end
  when :modify
    event.merge(associated_entry)
  else
    unless event.associated?
	    log.info "No associated entry in client for #{event.type} event. Dropping"
      log.info "---\n"; return
    end
  end

  perform_transform :out_command_transform, event, event.hint

  with_rescue("#{client.name}: Processing command") {client.process(event)}
  log.info "---\n"
  
end

#out_map_schema(event) ⇒ Object



432
433
434
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 432

def out_map_schema event
  map_schema event, vault_to_client_map if respond_to? :vault_to_client_map
end

#out_match(event) ⇒ Object



132
133
134
135
136
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 132

def out_match(event)
  log.debug "Default matching rule - client[out_place] exists?"
  path = out_place(event)
  client.respond_to?('[]') and client[path] and path
end

#out_place_transform(event) ⇒ Object



158
159
160
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 158

def out_place_transform(event)
  event.target_path = out_place(event)
end

#run_in_onceObject

execute the in pipe once and then return



182
183
184
185
186
187
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 182

def run_in_once
  return unless allowed_in
  log.debug "running #{name} 'in' pipeline once"
  client.once_only = true
  client.start {|event| in_handler(event)}
end

#run_onceObject

execute the pipeline once then return.



163
164
165
166
167
168
169
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 163

def run_once
  log.info "running #{name} pipeline once"
  started
  run_in_once
  run_out_once
  stopped
end

#run_out_onceObject

execute the out pipe once and then return



190
191
192
193
194
195
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 190

def run_out_once
  return unless allowed_out
  log.debug "running #{name} 'out' pipeline once"
  vault.once_only = true
  vault.start {|event| out_handler(event)}
end

#startObject



197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 197

def start
  log.info "starting #{name} pipeline"
  @running = true
  trap("sigint") {self.stop}
  started
  while @running
    run_in_once
    run_out_once
    sleep delay
  end
  stopped
  log.info "#{name} stopped."
end

#startedObject



171
172
173
174
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 171

def started
  client.started
  vault.started
end

#stopObject



211
212
213
214
215
216
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 211

def stop
  log.info "#{name} stopping..."
  @running = false
  Thread.main.run # i thought this would wake the thread from its sleep
  # but it seems to have no effect.
end

#stoppedObject



176
177
178
179
# File 'lib/ruby_sync/pipelines/base_pipeline.rb', line 176

def stopped
  client.stopped
  vault.stopped
end