Class: GlobalDaemonThread

Inherits:
BaseThread show all
Defined in:
lib/daemon/global_daemon_thread.rb

Instance Method Summary collapse

Methods inherited from BaseThread

#initialize

Constructor Details

This class inherits a constructor from BaseThread

Instance Method Details

#delete_deleted_itemsObject



95
96
97
98
99
100
101
102
103
104
105
# File 'lib/daemon/global_daemon_thread.rb', line 95

def delete_deleted_items
  $log.debug("GlobalDaemonThread: delete_deleted_items")
  
  r = FC::DB.query("SELECT i.id FROM #{FC::Item.table_name} as i LEFT JOIN #{FC::ItemStorage.table_name} as ist ON i.id=ist.item_id WHERE i.status = 'delete' AND ist.id IS NULL")
  ids = r.map{|row| row['id']}
  if ids.count > 0
    ids = ids.join(',')
    FC::DB.query("DELETE FROM #{FC::Item.table_name} WHERE id in (#{ids})")
    $log.info("GlobalDaemonThread: delete items #{ids}")
  end
end

#go(timeout) ⇒ Object



2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/daemon/global_daemon_thread.rb', line 2

def go(timeout)
  $log.info("Start global daemon thread with timeout=#{timeout}")
  while true do
    exit if $exit_signal
    sleep timeout.to_f/2
    exit if $exit_signal
    
    if FC::Var.get('global_daemon_host') == FC::Storage.curr_host
      FC::Var.set('global_daemon_host', FC::Storage.curr_host)
    else
      $log.info("Exit from GlobalDaemonThread: global daemon already running on #{FC::Var.get('global_daemon_host')}")
      FC::DB.close
      exit
    end
    
    make_item_copies
    make_item_mark_deleted
    make_deleted_error_items_storages
    make_deleted_error_items
    delete_deleted_items
    #TODO: периодически удалять (проставлять статус delete) для лиших is (число копий больше необходимого)
  end
end

#make_deleted_error_itemsObject



115
116
117
118
119
120
121
# File 'lib/daemon/global_daemon_thread.rb', line 115

def make_deleted_error_items
  $log.debug("GlobalDaemonThread: make_deleted_error_items")
  ttl = FC::Var.get('daemon_global_error_items_ttl', 86400).to_i
  cnt = FC::DB.query("SELECT count(*) as cnt FROM #{FC::Item.table_name} WHERE status = 'error' AND time < #{Time.new.to_i - ttl}").first['cnt']
  $log.debug("GlobalDaemonThread: mark deleted #{cnt} items")
  FC::DB.query("UPDATE #{FC::Item.table_name} SET status = 'delete' WHERE status = 'error' AND time < #{Time.new.to_i - ttl}")
end

#make_deleted_error_items_storagesObject



107
108
109
110
111
112
113
# File 'lib/daemon/global_daemon_thread.rb', line 107

def make_deleted_error_items_storages
  $log.debug("GlobalDaemonThread: make_deleted_error_items_storages")
  ttl = FC::Var.get('daemon_global_error_items_storages_ttl', 86400).to_i
  cnt = FC::DB.query("SELECT count(*) as cnt FROM #{FC::ItemStorage.table_name} WHERE status IN ('error', 'copy') AND time < #{Time.new.to_i - ttl}").first['cnt']
  $log.debug("GlobalDaemonThread: mark deleted #{cnt} items storages")
  FC::DB.query("UPDATE #{FC::ItemStorage.table_name} SET status = 'delete' WHERE status IN ('error', 'copy') AND time < #{Time.new.to_i - ttl}")
end

#make_item_copiesObject

make item copies by policy



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/daemon/global_daemon_thread.rb', line 27

def make_item_copies
  $log.debug("GlobalDaemonThread: make_item_copies")
  
  all_storages = FC::Storage.where
  all_policies = FC::Policy.where
  
  limit = FC::Var.get('daemon_global_tasks_group_limit', 1000).to_i
  all_policies.each do |policy|
    next if policy.copies.to_i < 2
    copies = (1..policy.copies.to_i-1).to_a.join(',')
    sql = "SELECT i.id as item_id, i.size, i.copies as item_copies, i.name, i.tag, i.dir, GROUP_CONCAT(ist.storage_name ORDER BY ist.id) as storages "+
      "FROM #{FC::Item.table_name} as i, #{FC::ItemStorage.table_name} as ist WHERE i.policy_id = #{policy.id} AND "+
      "ist.item_id = i.id AND i.copies IN (#{copies}) AND i.status = 'ready' AND ist.status <> 'delete' GROUP BY i.id LIMIT #{limit}"
    r = FC::DB.query(sql)
    r.each do |row|
      item_storages = row['storages'].split(',')
      $log.info("GlobalDaemonThread: new item_storage for item #{row['item_id']}, exclude #{item_storages}")
      if row['item_copies'] != item_storages.count
        $log.warn("GlobalDaemonThread: ItemStorage count <> item.copies for item #{row['item_id']}")
      elsif item_storages.count >= policy.copies.to_i
        $log.warn("GlobalDaemonThread: ItemStorage count >= policy.copies for item #{row['item_id']}")
      else
        src_storage = all_storages.detect{|s| item_storages.first == s.name}
        if src_storage
          storage = FC::CopyRule.get_proper_storage_for_copy(
            :item_id     => row['item_id'],
            :size        => row['size'],
            :item_copies => row['item_copies'],
            :name        => row['name'],
            :tag         => row['tag'],
            :dir         => row['dir'].to_i == 1,
            :src_storage => src_storage,
            :exclude     => item_storages
          )
          storage = src_storage.get_proper_storage_for_copy(row['size'], item_storages) unless storage
        end
        if storage
          FC::Item.new(:id => row['item_id'], :size => row['size']).make_item_storage(storage, 'copy')
        else
          error 'No available storage', :item_id => row['item_id']
        end
      end
    end
  end
end

#make_item_mark_deletedObject

mark deleted for deferred_delete items



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/daemon/global_daemon_thread.rb', line 74

def make_item_mark_deleted
  $log.debug("GlobalDaemonThread: make_item_mark_deleted")
  
  limit = FC::Var.get('daemon_global_tasks_group_limit', 1000).to_i
  loop do
    r = FC::DB.query %(
      SELECT i.id FROM #{FC::Item.table_name} as i, #{FC::Policy.table_name} as p 
      WHERE 
        i.status = 'deferred_delete' AND i.policy_id = p.id AND 
        i.time + p.delete_deferred_time < UNIX_TIMESTAMP()
      LIMIT #{limit}
    )
    break if r.count == 0
    ids = r.map{|e| e['id']}.join(',')
    $log.info("GlobalDaemonThread: mark delete items: #{ids}")
    FC::DB.query("UPDATE #{FC::ItemStorage.table_name} SET status='delete' WHERE item_id in (#{ids})")
    FC::DB.query("UPDATE #{FC::Item.table_name} SET status='delete' WHERE id in (#{ids})")
    sleep 1
  end
end