Class: Fluent::Plugin::JfrogSiemInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::JfrogSiemInput
- Defined in:
- lib/fluent/plugin/in_jfrog_siem.rb
Instance Method Summary collapse
-
#call_home(jpd_url) ⇒ Object
call home functionality.
-
#configure(conf) ⇒ Object
‘configure` is called before `start`.
-
#data_normalization(detailResp) ⇒ Object
normalizes Xray data according to common information models for all log-vendors.
-
#get_last_item_create_date ⇒ Object
pull the last item create date from the pos_file return created_date_string.
-
#get_xray_violations(xray_json, jpd_url) ⇒ Object
queries the xray API for violations based upon the input json.
-
#get_xray_violations_detail(xray_violation_detail_url) ⇒ Object
queries the xray API for violations based upon the input json.
- #pull_violation_details(xray_violation_detail_url) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
-
#start ⇒ Object
‘start` is called when starting and after `configure` is successfully completed.
Instance Method Details
#call_home(jpd_url) ⇒ Object
call home functionality
202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/fluent/plugin/in_jfrog_siem.rb', line 202 def call_home(jpd_url) call_home_json = { "productId": "jfrogLogAnalytics/v0.5.1", "features": [ { "featureId": "Platform/Xray" }, { "featureId": "Channel/xrayeventsiem" } ] } response = RestClient::Request.new( :method => :post, :url => jpd_url + "/artifactory/api/system/usage", :payload => call_home_json.to_json, :user => @username, :password => @apikey, :headers => { :accept => :json, :content_type => :json} ).execute do |response, request, result| puts "Posting call home information" end end |
#configure(conf) ⇒ Object
‘configure` is called before `start`. ’conf’ is a ‘Hash` that includes the configuration parameters. If the configuration is invalid, raise `Fluent::ConfigError`.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/fluent/plugin/in_jfrog_siem.rb', line 44 def configure(conf) super if @tag == "" raise Fluent::ConfigError, "Must define a tag for the SIEM data." end if @jpd_url == "" raise Fluent::ConfigError, "Must define the JPD URL to pull Xray SIEM violations." end if @username == "" raise Fluent::ConfigError, "Must define the username to use for authentication." end if @apikey == "" raise Fluent::ConfigError, "Must define the API Key to use for authentication." end if @pos_file == "" raise Fluent::ConfigError, "Must define a position file to record last SIEM violation pulled." end if @thread_count < 1 raise Fluent::ConfigError, "Must define at least one thread to process violation details." end if @thread_count > @batch_size raise Fluent::ConfigError, "Violation detail url thread count exceeds batch size." end if @wait_interval < 1 raise Fluent::ConfigError, "Wait interval must be greater than 1 to wait between pulling new events." end end |
#data_normalization(detailResp) ⇒ Object
normalizes Xray data according to common information models for all log-vendors
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 |
# File 'lib/fluent/plugin/in_jfrog_siem.rb', line 254 def data_normalization(detailResp) detailResp_json = JSON.parse(detailResp) cve = [] cvss_v2_list = [] cvss_v3_list = [] policy_list = [] rule_list = [] impacted_artifact_url_list = [] if detailResp_json.key?('properties') properties = detailResp_json['properties'] for index in 0..properties.length-1 do if properties[index].key?('cve') cve.push(properties[index]['cve']) end if properties[index].key?('cvss_v2') cvss_v2_list.push(properties[index]['cvss_v2']) end if properties[index].key?('cvss_v3') cvss_v3_list.push(properties[index]['cvss_v3']) end end detailResp_json["cve"] = cve.sort.reverse[0] cvss_v2 = cvss_v2_list.sort.reverse[0] cvss_v3 = cvss_v3_list.sort.reverse[0] if !cvss_v3.nil? cvss = cvss_v3 elsif !cvss_v2.nil? cvss = cvss_v2 end cvss_score = cvss[0..2] cvss_version = cvss.split(':')[1][0..2] detailResp_json["cvss_score"] = cvss_score detailResp_json["cvss_version"] = cvss_version end if detailResp_json.key?('matched_policies') matched_policies = detailResp_json['matched_policies'] for index in 0..matched_policies.length-1 do if matched_policies[index].key?('policy') policy_list.push(matched_policies[index]['policy']) end if matched_policies[index].key?('rule') rule_list.push(matched_policies[index]['rule']) end end detailResp_json['policies'] = policy_list detailResp_json['rules'] = rule_list end impacted_artifacts = detailResp_json['impacted_artifacts'] for impacted_artifact in impacted_artifacts do matchdata = impacted_artifact.match /default\/(?<repo_name>[^\/]*)\/(?<path>.*)/ impacted_artifact_url = matchdata['repo_name'] + ":" + matchdata['path'] + " " impacted_artifact_url_list.append(impacted_artifact_url) end detailResp_json['impacted_artifacts_url'] = impacted_artifact_url_list return detailResp_json end |
#get_last_item_create_date ⇒ Object
pull the last item create date from the pos_file return created_date_string
194 195 196 197 198 199 |
# File 'lib/fluent/plugin/in_jfrog_siem.rb', line 194 def get_last_item_create_date() if(!(File.exist?(@pos_file))) @pos_file = File.new(@pos_file, "w") end return IO.readlines(@pos_file).last end |
#get_xray_violations(xray_json, jpd_url) ⇒ Object
queries the xray API for violations based upon the input json
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 |
# File 'lib/fluent/plugin/in_jfrog_siem.rb', line 235 def get_xray_violations(xray_json, jpd_url) response = RestClient::Request.new( :method => :post, :url => jpd_url + "/xray/api/v1/violations", :payload => xray_json.to_json, :user => @username, :password => @apikey, :headers => { :accept => :json, :content_type => :json} ).execute do |response, request, result| case response.code when 200 return response.to_str else raise Fluent::ConfigError, "Cannot reach Artifactory URL to pull Xray SIEM violations." end end end |
#get_xray_violations_detail(xray_violation_detail_url) ⇒ Object
queries the xray API for violations based upon the input json
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/fluent/plugin/in_jfrog_siem.rb', line 217 def get_xray_violations_detail(xray_violation_detail_url) response = RestClient::Request.new( :method => :get, :url => xray_violation_detail_url, :user => @username, :password => @apikey ).execute do |response, request, result| case response.code when 200 return response.to_str else raise Fluent::ConfigError, "Cannot reach Artifactory URL to pull Xray SIEM violations." end end end |
#pull_violation_details(xray_violation_detail_url) ⇒ Object
314 315 316 317 318 319 320 321 322 323 |
# File 'lib/fluent/plugin/in_jfrog_siem.rb', line 314 def pull_violation_details(xray_violation_detail_url) begin detailResp=get_xray_violations_detail(xray_violation_detail_url) time = Fluent::Engine.now detailResp_json = data_normalization(detailResp) router.emit(@tag, time, detailResp_json) rescue raise Fluent::ConfigError, "Error pulling violation details url #{xray_violation_detail_url}" end end |
#run ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/fluent/plugin/in_jfrog_siem.rb', line 96 def run call_home(@jpd_url) # runs the violation pull last_created_date_string = get_last_item_create_date() begin last_created_date = DateTime.parse(last_created_date_string).strftime("%Y-%m-%dT%H:%M:%SZ") rescue last_created_date = DateTime.parse("1970-01-01T00:00:00Z").strftime("%Y-%m-%dT%H:%M:%SZ") end offset_count=1 left_violations=0 waiting_for_violations = false xray_json={"filters": { "created_from": last_created_date }, "pagination": {"order_by": "created","limit": @batch_size ,"offset": offset_count } } while true # Grab the batch of records resp=get_xray_violations(xray_json, @jpd_url) number_of_violations = JSON.parse(resp)['total_violations'] if left_violations <= 0 left_violations = number_of_violations end xray_violation_urls_list = [] for index in 0..JSON.parse(resp)['violations'].length-1 do # Get the violation item = JSON.parse(resp)['violations'][index] # Get the created date and check if we should skip (already processed) or process this record. created_date_string = item['created'] created_date = DateTime.parse(created_date_string).strftime("%Y-%m-%dT%H:%M:%SZ") # Determine if we need to persist this record or not persistItem = true if waiting_for_violations if created_date <= last_created_date # "not persisting it - waiting for violations" # waiting and same last timestamp (left violations in batch) persistItem = false end if created_date > last_created_date # new violation while waiting persistItem = true waiting_for_violations = false end else if created_date < last_created_date # "persisting everything" persistItem = true end end # Publish the record to fluentd if persistItem now = Fluent::Engine.now router.emit(@tag, now, item) # write to the pos_file created_date_string open(@pos_file, 'a') do |f| f << "#{created_date_string}\n" end # Mark this as the last record successfully processed last_created_date_string = created_date_string last_created_date = created_date # Grab violation detail url and add to url list to process w/ thread pool xray_violation_details_url=item['violation_details_url'] xray_violation_urls_list.append(xray_violation_details_url) end end # iterate over url array adding to thread pool each url. # limit max workers to thread count to prevent overloading xray. thread_pool = Thread.pool(thread_count) thread_pool.process { for xray_violation_url in xray_violation_urls_list do pull_violation_details(xray_violation_url) end } thread_pool.shutdown # reduce left violations by jump size (not all batches have full item count??) left_violations = left_violations - @batch_size if left_violations <= 0 waiting_for_violations = true sleep(@wait_interval) else # Grab the next record to process for the violation details url waiting_for_violations = false offset_count = offset_count + 1 xray_json={"filters": { "created_from": last_created_date_string }, "pagination": {"order_by": "created","limit": @batch_size , "offset": offset_count } } end end end |
#shutdown ⇒ Object
89 90 91 92 93 |
# File 'lib/fluent/plugin/in_jfrog_siem.rb', line 89 def shutdown @running = false @thread.join super end |
#start ⇒ Object
‘start` is called when starting and after `configure` is successfully completed.
82 83 84 85 86 |
# File 'lib/fluent/plugin/in_jfrog_siem.rb', line 82 def start super @running = true @thread = Thread.new(&method(:run)) end |