Class: Toccatore::UsageUpdate

Inherits:
Base
  • Object
show all
Includes:
Queue
Defined in:
lib/toccatore/usage_update.rb

Constant Summary collapse

LICENSE =
"https://creativecommons.org/publicdomain/zero/1.0/"

Constants inherited from Base

Base::ICON_URL

Instance Method Summary collapse

Methods included from Queue

#delete_message, #get_message, #get_total, #queue, #queue_url

Methods inherited from Base

#cleanup_author, #get_authors, #get_doi_ra, #get_hashed_authors, #get_name_identifier, #get_one_author, #get_one_hashed_author, #get_query_url, #get_total, #is_personal_name?, #job_batch_size, #name_detector, #normalize_doi, #orcid_as_url, #orcid_from_url, #send_notification_to_slack, #timeout, #unfreeze, #url, #validate_doi, #validate_orcid, #validate_prefix

Constructor Details

#initialize(options = {}) ⇒ UsageUpdate

Returns a new instance of UsageUpdate.



10
11
12
# File 'lib/toccatore/usage_update.rb', line 10

def initialize options={}
  @sqs = queue options
end

Instance Method Details

#format_event(type, data, options) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/toccatore/usage_update.rb', line 89

def format_event type, data, options
  { "uuid" => SecureRandom.uuid,
    "message-action" => "add",
    "subj-id" => data[:report_id],
    "subj"=> {
      "pid"=> data[:report_id],
      "issued"=> data[:created]
    },
    "total"=> data[:count],
    "obj-id" => data[:pid],
    "relation-type-id" => type,
    "source-id" => "datacite-usage",
    "source-token" => options[:source_token],
    "occurred-at" => data[:created_at],
    "license" => LICENSE 
  }
end

#get_data(reponse) ⇒ Object



56
57
58
59
60
61
# File 'lib/toccatore/usage_update.rb', line 56

def get_data reponse 
  return OpenStruct.new(body: { "errors" => "Queue is empty" }) if reponse.messages.empty?

  body = JSON.parse(reponse.messages[0].body)
  Maremma.get(body["report_id"])
end

#metrics_urlObject



81
82
83
# File 'lib/toccatore/usage_update.rb', line 81

def metrics_url
  ENV['SASHIMI_URL']
end

#parse_data(result, options = {}) ⇒ Object



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/toccatore/usage_update.rb', line 107

def parse_data(result, options={})
  return result.body.fetch("errors") if result.body.fetch("errors", nil).present?

  items = result.body.dig("data","report","report-datasets")
  header = result.body.dig("data","report","report-header")
  report_id = "#{metrics_url}/#{result.body.dig("data","report","id")}"

  created = header.fetch("created")
  Array.wrap(items).reduce([]) do |x, item|
    data = {}
    data[:doi] = item.dig("dataset-id").first.dig("value")
    data[:pid] = normalize_doi(data[:doi])
    data[:created] = created
    data[:report_id] = report_id
    data[:created_at] = created

    instances = item.dig("performance", 0, "instance")

    return x += [OpenStruct.new(body: { "errors" => "There are too many instances in #{data[:doi]} for report #{report_id}. There can only be 4" })] if instances.size > 8
 
    x += Array.wrap(instances).reduce([]) do |ssum, instance|
      data[:count] = instance.dig("count")
      event_type = "#{instance.dig("metric-type")}-#{instance.dig("access-method")}"
      ssum << format_event(event_type, data, options)
      ssum
    end
  end    
end

#process_data(options = {}) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/toccatore/usage_update.rb', line 43

def process_data(options = {})
  message = get_message
  return [OpenStruct.new(body: { "data" => [] })] if message.empty?
  data = get_data(message)
  events = parse_data(data, options)
  return [OpenStruct.new(body: { "data" => [] })] if events.empty?
  errors = push_data(events, options)
  if errors < 1
    delete_message message
  end
  errors
end

#push_data(items, options = {}) ⇒ Object

method returns number of errors



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/toccatore/usage_update.rb', line 65

def push_data(items, options={})
  if items.empty?
    puts "No works found in the Queue."
    0
  elsif options[:access_token].blank?
    puts "An error occured: Access token missing."
    options[:total]
  else
    error_total = 0
    Array(items).each do |item|
      error_total += push_item(item, options)
    end
    error_total
  end
end

#push_item(item, options = {}) ⇒ Object



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/toccatore/usage_update.rb', line 136

def push_item(item, options={})
  return OpenStruct.new(body: { "errors" => [{ "title" => "Access token missing." }] }) if options[:access_token].blank?
  return 0 if item == "Queue is empty"

  host = options[:push_url].presence || "https://api.test.datacite.org"
  push_url = host + "/events/" + item["uuid"].to_s
  data = { "data" => {
              "id" => item["uuid"],
              "type" => "events",
              "attributes" => item.except("id") }}
  response = Maremma.put(push_url, data: data.to_json,
                                    bearer: options[:access_token],
                                    content_type: 'application/json',
                                    host: host)
                                    
  if response.status == 201 
    puts "#{item['subj-id']} #{item['relation-type-id']} #{item['obj-id']} pushed to Event Data service."
    0
  elsif response.status == 200
    puts "#{item['subj-id']} #{item['relation-type-id']} #{item['obj-id']} pushed to Event Data service for update."
    0
  elsif response.body["errors"].present?
    puts "#{item['subj-id']} #{item['relation-type-id']} #{item['obj-id']} had an error:"
    puts "#{response.body['errors'].first['title']}"
    1
  end
end

#queue_jobs(options = {}) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/toccatore/usage_update.rb', line 14

def queue_jobs(options={})

  total = get_total(options)
  
  if total < 1
    text = "No works found for in the Usage Reports Queue."
  end

  error_total = 0
  proccessed_messages = 0
  num_messages = total
  while num_messages != 0 
      processed = process_data(options)
      error_total += processed
      proccessed_messages += 1 if processed == 0
      num_messages -= proccessed_messages
  end
  text = "#{proccessed_messages} works processed with #{error_total} errors for Usage Reports Queue #{queue_url}"

  puts text
  # send slack notification
  options[:level] = total > 0 ? "good" : "warning"
  options[:title] = "Report for #{source_id}"
  send_notification_to_slack(text, options) if options[:slack_webhook_url].present?

  # return number of works queued
  proccessed_messages
end

#source_idObject



85
86
87
# File 'lib/toccatore/usage_update.rb', line 85

def source_id
  "usage_update"
end