64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
# File 'lib/logstash/outputs/azure_loganalytics.rb', line 64
def multi_receive(events)
flush_guid = SecureRandom.uuid
@logger.debug("Start receive: #{flush_guid}. Received #{events.length} events")
documentsByLogType = {} events.each do |event|
document = {}
log_type_for_event = event.sprintf(@log_type)
event_hash = event.to_hash()
if @key_names.length > 0
keys_intersection = @key_names & event_hash.keys
keys_intersection.each do |key|
if @key_types.include?(key)
document[key] = convert_value(@key_types[key], event_hash[key])
else
document[key] = event_hash[key]
end
end
else
document = event_hash
end
next if (document.keys).length < 1
if documentsByLogType[log_type_for_event] == nil then
documentsByLogType[log_type_for_event] = []
end
documentsByLogType[log_type_for_event].push(document)
end
if documentsByLogType.length < 1
@logger.debug("No documents in batch. Skipping")
return
end
documentsByLogType.each do |log_type_for_events, events|
events.each_slice(@max_batch_items) do |event_batch|
begin
@logger.debug("Posting log batch (log count: #{event_batch.length}) as log type #{log_type_for_events} to DataCollector API. First log: " + (event_batch[0].to_json).to_s)
res = @client.post_data(log_type_for_events, event_batch, @time_generated_field)
if Azure::Loganalytics::Datacollectorapi::Client.is_success(res)
@logger.debug("Successfully posted logs as log type #{log_type_for_events} with result code #{res.code} to DataCollector API")
else
@logger.error("DataCollector API request failure (log type #{log_type_for_events}): error code: #{res.code}, data=>" + (event_batch.to_json).to_s)
end
rescue Exception => ex
@logger.error("Exception occured in posting to DataCollector API as log type #{log_type_for_events}: '#{ex}', data=>" + (event_batch.to_json).to_s)
end
end
end
@logger.debug("End receive: #{flush_guid}")
end
|