Class: Envoi::WatchFolderUtility::WatchFolder

Inherits:
Object
  • Object
show all
Defined in:
lib/envoi/watch_folder_utility/watch_folder.rb,
lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb,
lib/cantemo/portal/agent/cli/commands/watch_folders-working.rb

Defined Under Namespace

Classes: Foreman, Handler, State, Worker

Constant Summary collapse

DEFAULT_POLL_INTERVAL =
15
DEFAULT_PROCESSOR_COUNT_LIMIT =
10

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args = { }) ⇒ WatchFolder

Returns a new instance of WatchFolder.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 23

def initialize(args = { })
  initialize_logger(args)
  @definition = args[:definition].dup

  logger.debug { "Initializing Watch Folder. #{Object.__id__}" }

  @ignored_file_paths_lock = Mutex.new

  @threaded = args.fetch(:threaded, true)

  @default_maximum_active_processors = DEFAULT_PROCESSOR_COUNT_LIMIT
  @processors = Hash.new { |h, k| h[k] = {} }

  @default_handler_class = Envoi::WatchFolderUtility::WatchFolder::Handler::Listen
  process_watch_folder_def

  initialize_handler

  @default_agent = args[:default_agent]
  @default_agent_class = args[:default_agent_class] || @default_agent.class

  @last_poll_time = nil

  process_agent_defs
  logger.debug { "Watch Folder Initialized. #{Object.__id__}" }
end

Instance Attribute Details

#active_processorsObject

Returns the value of attribute active_processors.



16
17
18
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 16

def active_processors
  @active_processors
end

#agentObject

Returns the value of attribute agent.



14
15
16
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 14

def agent
  @agent
end

#definitionObject

Returns the value of attribute definition.



14
15
16
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 14

def definition
  @definition
end

#handlerObject

Returns the value of attribute handler.



14
15
16
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 14

def handler
  @handler
end

#last_poll_timeObject

Returns the value of attribute last_poll_time.



18
19
20
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 18

def last_poll_time
  @last_poll_time
end

#loggerObject

Returns the value of attribute logger.



14
15
16
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 14

def logger
  @logger
end

#min_stable_poll_countObject

Returns the value of attribute min_stable_poll_count.



18
19
20
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 18

def min_stable_poll_count
  @min_stable_poll_count
end

#min_stable_timeObject

Returns the value of attribute min_stable_time.



18
19
20
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 18

def min_stable_time
  @min_stable_time
end

#poll_intervalObject

Returns the value of attribute poll_interval.



18
19
20
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 18

def poll_interval
  @poll_interval
end

#processorsObject

Returns the value of attribute processors.



16
17
18
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 16

def processors
  @processors
end

Instance Method Details

#ignored_file_pathsObject



82
83
84
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 82

def ignored_file_paths
  handler.ignored_files_map.keys
end

#initialize_agent(args = {}) ⇒ Object



72
73
74
75
76
77
78
79
80
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 72

def initialize_agent(args = {})

  @agent ||= begin
    logger.debug { "Initializing Agent. #{@default_agent_class} #{args}" }
    _agent = @default_agent_class.new(config: args, logger: logger, default_preserve_file_path: false)
    logger.debug { "Agent Instance created." }
    _agent
  end
end

#initialize_handler(watch_folder_def = @definition) ⇒ Object



118
119
120
121
122
123
124
125
126
127
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 118

def initialize_handler(watch_folder_def = @definition)
  args_out              = {}
  args_out[:logger]     ||= logger.dup
  args_out[:definition] = watch_folder_def

  handler_class = @default_handler_class
  logger.debug { "Creating Watch Folder Handler Instance. #{handler_class.name}" }
  @handler = handler_class.new(args_out)
  logger.debug { "Watch Folder Handler Instance Created. #{handler_class.name}" }
end

#initialize_logger(args = {}) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 50

def initialize_logger(args = {})
  @logger = args[:logger]
  # @logger   = args[:logger] ||= begin
  #   _log_to = MultiIO.new(STDOUT)
  #   log_to = args[:log_to]
  #   log_age = args[:log_age] || 'daily'
  #   _log_to.add_target(File.open(log_to, 'a')) if log_to
  #   _logger = Logger.new(_log_to, log_age)
  # end
  #
  # log_level = args[:log_level] ||= Logger::INFO
  # if log_level
  #   if log_level.is_a?(String)
  #     log_level.downcase!
  #     _log_level = %w(fatal error warn info debug).find { |v| v == log_level }
  #     log_level = _log_level ? Logger::Severity.const_get(_log_level.to_sym.upcase) : Logger::INFO
  #   end
  #   @logger.level = log_level
  # end
  @logger
end

#nameObject



96
97
98
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 96

def name
  @name ||= definition['name'] || definition['paths'] || definition['path'] || Object.__id__
end

#pollObject



328
329
330
331
332
333
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 328

def poll
  @previous_poll_time = @last_poll_time
  @last_poll_time = Time.now

  handler.poll if handler.respond_to?(:poll)
end

#poll_interval_elapsed?Boolean

Returns:

  • (Boolean)


335
336
337
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 335

def poll_interval_elapsed?
  !last_poll_time || (Time.now - last_poll_time) >= poll_interval
end

#process_agent_def(agent_def) ⇒ Object



86
87
88
89
90
91
92
93
94
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 86

def process_agent_def(agent_def)
  if @default_agent
    _agent_def_storages = agent_def['storages'] || {}
    if _agent_def_storages
      agent_def['storages'] = default_agent.agent_config_storages.merge(_agent_def_storages)
    end
  end
  initialize_agent(agent_def)
end

#process_agent_defsObject



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 100

def process_agent_defs
  agent_defs = definition['agents']
  if agent_defs
    if agent_defs.is_a?(Hash)
      _agents = agent_defs.map do |name, agent_def|
        agent_def['name'] ||= name
        # agent_def['type'] ||= name
        process_agent_def(agent_def)
      end
    elsif agent_defs.is_a?(Array)
      _agents = agent_defs.map { |agent_def| process_agent_def(agent_def) }
    else

    end
  end
  @agent ||= @default_agent
end

#process_watch_folder_def(watch_folder_def = @definition) ⇒ Object

Parameters:

  • watch_folder_def (Hash) (defaults to: @definition)

Options Hash (watch_folder_def):

  • path (String)
  • upload_to_storage_id (String)
  • name (String) — default: path
  • paths (Array<String>) — default: [path]
  • exclude (String) — default: '**/.*'
  • excludes (Array<string>) — default: [exclude]
  • include (String)
  • includes (Array<String>) — default: [include]
  • quarantine_directory_path (String)
  • completed_directory_path (String)
  • import_args (Hash) — default: {}
  • import_options (Hash) — default: {}
  • maximum_active_processors (Integer|False) — default: @default_maximum_active_processors
  • logging (Hash)


145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 145

def process_watch_folder_def(watch_folder_def = @definition)
  logger.debug { "Initializing Watch Folder #{watch_folder_def.inspect}" }

  logger.debug { "Initializing parameter 'paths'." }
  name = watch_folder_def['name']

  path = watch_folder_def['path']

  paths = watch_folder_def['paths'] ||= []
  paths = [ paths ] if paths.is_a?(String)
  paths.concat [*path] if path
  paths.map! { |p| File.expand_path(p) }
  if paths.empty?
    name_as_path = File.expand_path(name)
    paths.concat name_as_path if Dir.exist?(name_as_path)
  end
  paths.uniq!
  watch_folder_def['paths'] = paths
  # watch_folder_def['path'] ||= paths.first if paths.length == 1
  watch_folder_def.delete('path')

  if paths.empty?
    logger.error { "Failed to initialize watch folder. No path found in watch folder definition." }
    return false
  end
  logger.debug { "Parameter 'paths' initialized." }

  logger.debug { "Initializing parameter 'includes'." }
  include  = watch_folder_def['include']
  includes = (watch_folder_def['includes'] ||= [])
  includes.concat [*include] if include
  includes.uniq!
  includes.map! { |e| Regexp.try_convert(e) || e }
  watch_folder_def['includes'] = includes
  watch_folder_def.delete('include')
  logger.debug { "Parameter `includes` initialized." }

  logger.debug { "Initializing parameter 'excludes'." }
  exclude  = watch_folder_def['exclude']
  exclude  ||= '**/.*'
  excludes = (watch_folder_def['excludes'] ||= [])
  excludes.concat [*exclude] if exclude
  excludes.uniq!
  excludes.map! { |e| Regexp.try_convert(e) || e }
  watch_folder_def['excludes'] = excludes
  watch_folder_def.delete('exclude')
  logger.debug { "Parameter `excludes` initialized." }


  logger.debug { "Initializing parameter `quarantine directory path`." }
  quarantine_directory_path = watch_folder_def['quarantine_directory_path'] || watch_folder_def['quarantine_path']
  if quarantine_directory_path
    quarantine_directory_path                     = File.expand_path(quarantine_directory_path)
    watch_folder_def['quarantine_directory_path'] = quarantine_directory_path

    unless Dir.exist?(quarantine_directory_path)
      logger.warn { "Quarantine directory path '#{quarantine_directory_path}' does not exist. Files will be ignored instead." }
    end
  end
  watch_folder_def.delete('quarantine_path')
  logger.debug { "Parameter `quarantine directory path` initialized." }

  logger.debug { "Initializing parameter 'completed directory path'." }
  completed_directory_path = watch_folder_def['completed_directory_path'] || watch_folder_def['completed_path']
  if completed_directory_path
    completed_directory_path                     = File.expand_path(completed_directory_path)
    watch_folder_def['completed_directory_path'] = completed_directory_path

    unless Dir.exist?(completed_directory_path)
      logger.warn { "Completed directory path '#{completed_directory_path}' does not exist. File will be ignored instead." }
    end
  end
  watch_folder_def.delete('completed_path')
  logger.debug { "Parameter 'completed directory path' initialized." }


  logger.debug { "Initializing parameter `upload to storage id`." }
  storage_id = watch_folder_def['upload_to_storage_id'] || watch_folder_def['storage_id']
  watch_folder_def['upload_to_storage_id'] ||= storage_id
  watch_folder_def.delete('storage_id')
  unless storage_id
    logger.warn { "No `upload to storage id` specified. Uploading will be skipped for this watch folder." }
  end
  logger.debug { "Parameter 'upload to storage id' initialized." }

  logger.debug { "Initializing parameter 'upload/import arguments'." }
  upload_args               = watch_folder_def['upload_args']
  item_add_args             = watch_folder_def['item_add_args'] || { }
  item_add_options          = watch_folder_def['item_add_options'] || { }
  import_args               = watch_folder_def['import_args'] || { }
  import_options            = watch_folder_def['import_options'] || { }

  import_args = Hash[import_args.map { |k,v| [ k.respond_to?(:to_sym) ? k.to_sym.downcase : k, v ] }]
  import_options = Hash[import_options.map { |k,v| [ k.respond_to?(:to_sym) ? k.to_sym.downcase : k, v ] }]

  # Allow adding to collection to be overridden
  add_item_to_collection = item_add_args.fetch(:add_item_to_collection,
                                               item_add_options.fetch(:add_item_to_collection,
                                                                  watch_folder_def['add_item_to_collection']))
  if add_item_to_collection.nil? || add_item_to_collection
    _add_item_to_collection = false
    collection_id = watch_folder_def['collection_id']
    if collection_id
      item_add_args[:collection_id] ||= collection_id
      _add_item_to_collection = true
      watch_folder_def.delete('collection_id')
    else
      collection_name = watch_folder_def['collection_name']
      if collection_name
        item_add_args[:collection_name] ||= collection_name
        _add_item_to_collection = true
        watch_folder_def.delete('collection_name')
      else
        file_path_collection_name_position = watch_folder_def['file_path_collection_name_position']
        if file_path_collection_name_position
          item_add_args[:file_path_collection_name_position] = file_path_collection_name_position
          _add_item_to_collection = true
          watch_folder_def.delete('file_path_collection_name_position')
        end
      end
    end
    import_options[:add_item_to_collection] ||= _add_item_to_collection
  end

   = watch_folder_def['metadata']
  if 
    item_add_args[:metadata] = 
    watch_folder_def.delete('metadata')
  end

  field_group = watch_folder_def['field_group']
  if field_group
    item_add_args[:field_group] = field_group
    watch_folder_def.delete('field_group')
  end

  ingest_group = watch_folder_def['ingest_group']
  if ingest_group
     = (import_args[:jobmetadata] ||= '')
     += ',' unless .empty?
     += "portal_groups:StringArray=#{ingest_group}"
    import_args[:jobmetadata] = 
    watch_folder_def.delete('ingest_group')
  end

  item_add_args = symbolize_keys(item_add_args)
  (item_add_args[:import_args] ||= {}).merge! import_args if import_args.is_a?(Hash)
  (item_add_args[:import_options] ||= {}).merge! import_options if import_options.is_a?(Hash)

  upload_args = symbolize_keys(upload_args)
  ((upload_args ||= {})[:item_add_args] ||= {}).merge! item_add_args if item_add_args.is_a?(Hash)
  ((upload_args ||= {})[:item_add_options] ||= {}).merge! symbolize_keys(item_add_options) if item_add_options.is_a?(Hash)

  watch_folder_def.delete('import_args')
  watch_folder_def.delete('import_options')

  watch_folder_def['upload_args'] = upload_args
  logger.debug { "Parameter 'upload/import arguments' initialized. #{upload_args}" }

  maximum_active_processors = watch_folder_def['maximum_active_processors']
  if maximum_active_processors.nil?
    maximum_active_processors = @default_maximum_active_processors
    watch_folder_def['maximum_active_processors'] = maximum_active_processors
  end

  logger.debug { "Initializing parameter 'agents'." }
  agent = watch_folder_def['agent']
  agents = watch_folder_def['agents'] ||= [ ]
  if agents.is_a?(Hash)
    agents = agents.map { |k,v| v['name'] ||= k;  v }
  end
  if agent
    if agent.is_a?(Hash) && agent.keys.length == 1
      agent = agent.map { |k,v| v['name'] ||= k; v }
    end
    agents.concat [*agent]
    watch_folder_def.delete('agent')
  end
  logger.debug { "Parameter 'agent' initialized." }

  @poll_interval = watch_folder_def['poll_interval'] ||= DEFAULT_POLL_INTERVAL
end

#runObject



339
340
341
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 339

def run
  handler.run if handler.respond_to?(:run)
end

#stable_filesObject



343
344
345
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 343

def stable_files
  handler.stable_files
end

#symbolize_keys(value, recursive = true) ⇒ Object

Converts hash keys to symbols

Parameters:

  • value (Hash)

    hash

  • recursive (Boolean) (defaults to: true)

    Will recurse into any values that are hashes or arrays



351
352
353
354
355
356
357
358
359
360
# File 'lib/envoi/watch_folder_utility/watch_folder.rb', line 351

def symbolize_keys (value, recursive = true)
  case value
  when Hash
    Hash[value.map { |k,v| [ k.respond_to?(:to_sym) ? k.to_sym : k, recursive ? symbolize_keys(v, true) : v ] }]
  when Array
    value.map { |v| symbolize_keys(v, recursive) }
  else
    value
  end
end