Class: Fluent::Plugin::JfrogSiemInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::JfrogSiemInput
- Defined in:
- lib/fluent/plugin/in_jfrog_siem.rb
Instance Method Summary collapse
-
#call_home(jpd_url, access_token) ⇒ Object
call home functionality.
-
#configure(conf) ⇒ Object
‘configure` is called before `start`.
-
#data_normalization(detailResp) ⇒ Object
normalizes Xray data according to common information models for all log-vendors.
-
#get_last_item_create_date ⇒ Object
pull the last item create date from the pos_file return created_date_string.
-
#get_xray_violations(xray_json, jpd_url, access_token) ⇒ Object
queries the xray API for violations based upon the input json.
-
#get_xray_violations_detail(xray_violation_detail_url, access_token) ⇒ Object
queries the xray API for violations based upon the input json.
- #pull_violation_details(xray_violation_detail_url, access_token) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
-
#start ⇒ Object
‘start` is called when starting and after `configure` is successfully completed.
Instance Method Details
#call_home(jpd_url, access_token) ⇒ Object
call home functionality
181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/fluent/plugin/in_jfrog_siem.rb', line 181 def call_home(jpd_url, access_token) call_home_json = { "productId": "jfrogLogAnalytics/v0.5.1", "features": [ { "featureId": "Platform/Xray" }, { "featureId": "Channel/xrayeventsiem" } ] } response = RestClient::Request.new( :method => :post, :url => jpd_url + "/artifactory/api/system/usage", :payload => call_home_json.to_json, :headers => { :accept => :json, :content_type => :json, Authorization:'Bearer ' + access_token } ).execute do |response, request, result| puts "Posting call home information" end end |
#configure(conf) ⇒ Object
‘configure` is called before `start`. ’conf’ is a ‘Hash` that includes the configuration parameters. If the configuration is invalid, raise `Fluent::ConfigError`.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/fluent/plugin/in_jfrog_siem.rb', line 43 def configure(conf) super if @tag == "" raise Fluent::ConfigError, "Must define a tag for the SIEM data." end if @jpd_url == "" raise Fluent::ConfigError, "Must define the JPD URL to pull Xray SIEM violations." end if @access_token == "" raise Fluent::ConfigError, "Must define the access token to use for authentication." end if @pos_file == "" raise Fluent::ConfigError, "Must define a position file to record last SIEM violation pulled." end if @thread_count < 1 raise Fluent::ConfigError, "Must define at least one thread to process violation details." end if @thread_count > @batch_size raise Fluent::ConfigError, "Violation detail url thread count exceeds batch size." end if @wait_interval < 1 raise Fluent::ConfigError, "Wait interval must be greater than 1 to wait between pulling new events." end end |
#data_normalization(detailResp) ⇒ Object
normalizes Xray data according to common information models for all log-vendors
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/fluent/plugin/in_jfrog_siem.rb', line 228 def data_normalization(detailResp) detailResp_json = JSON.parse(detailResp) properties = detailResp_json['properties'] cve = [] cvss_v2_list = [] cvss_v3_list = [] for index in 0..properties.length-1 do if properties[index].key?('cve') cve.push(properties[index]['cve']) end if properties[index].key?('cvss_v2') cvss_v2_list.push(properties[index]['cvss_v2']) end if properties[index].key?('cvss_v3') cvss_v3_list.push(properties[index]['cvss_v3']) end end detailResp_json["cve"] = cve.sort.reverse[0] cvss_v2 = cvss_v2_list.sort.reverse[0] cvss_v3 = cvss_v3_list.sort.reverse[0] if cvss_v3.length() > 0 cvss = cvss_v3 elsif cvss_v2.length() > 0 cvss = cvss_v2 end cvss_score = cvss[0..2] cvss_version = cvss.split(':')[1][0..2] detailResp_json["cvss_score"] = cvss_score detailResp_json["cvss_version"] = cvss_version return detailResp_json end |
#get_last_item_create_date ⇒ Object
pull the last item create date from the pos_file return created_date_string
173 174 175 176 177 178 |
# File 'lib/fluent/plugin/in_jfrog_siem.rb', line 173 def get_last_item_create_date() if(!(File.exist?(@pos_file))) @pos_file = File.new(@pos_file, "w") end return IO.readlines(@pos_file).last end |
#get_xray_violations(xray_json, jpd_url, access_token) ⇒ Object
queries the xray API for violations based upon the input json
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/fluent/plugin/in_jfrog_siem.rb', line 211 def get_xray_violations(xray_json, jpd_url, access_token) response = RestClient::Request.new( :method => :post, :url => jpd_url + "/xray/api/v1/violations", :payload => xray_json.to_json, :headers => { :accept => :json, :content_type => :json, Authorization:'Bearer ' + access_token } ).execute do |response, request, result| case response.code when 200 return response.to_str else raise Fluent::StandardError, "Cannot reach Artifactory URL to pull Xray SIEM violations." end end end |
#get_xray_violations_detail(xray_violation_detail_url, access_token) ⇒ Object
queries the xray API for violations based upon the input json
194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/fluent/plugin/in_jfrog_siem.rb', line 194 def get_xray_violations_detail(xray_violation_detail_url, access_token) response = RestClient::Request.new( :method => :get, :url => xray_violation_detail_url, headers: {Authorization:'Bearer ' + access_token} ).execute do |response, request, result| case response.code when 200 return response.to_str else raise Fluent::StandardError, "Cannot reach Artifactory URL to pull Xray SIEM violations." end end end |
#pull_violation_details(xray_violation_detail_url, access_token) ⇒ Object
260 261 262 263 264 265 266 267 268 269 |
# File 'lib/fluent/plugin/in_jfrog_siem.rb', line 260 def pull_violation_details(xray_violation_detail_url, access_token) begin detailResp=get_xray_violations_detail(xray_violation_detail_url, access_token) time = Fluent::Engine.now detailResp_json = data_normalization(detailResp) router.emit(@tag, time, detailResp_json) rescue raise Fluent::StandardError, "Error pulling violation details url #{xray_violation_detail_url}" end end |
#run ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/fluent/plugin/in_jfrog_siem.rb', line 91 def run call_home(@jpd_url, @access_token) # runs the violation pull last_created_date_string = get_last_item_create_date() begin last_created_date = DateTime.parse(last_created_date_string).strftime("%Y-%m-%dT%H:%M:%SZ") rescue last_created_date = DateTime.parse("1970-01-01T00:00:00Z").strftime("%Y-%m-%dT%H:%M:%SZ") end offset_count=1 left_violations=0 xray_json={"filters": { "created_from": last_created_date }, "pagination": {"order_by": "created","limit": @batch_size ,"offset": offset_count } } while true # Grab the batch of records resp=get_xray_violations(xray_json, @jpd_url, @access_token) number_of_violations = JSON.parse(resp)['total_violations'] if left_violations <= 0 left_violations = number_of_violations end xray_violation_urls_list = [] for index in 0..JSON.parse(resp)['violations'].length-1 do # Get the violation item = JSON.parse(resp)['violations'][index] # Get the created date and check if we should skip (already processed) or process this record. created_date_string = item['created'] created_date = DateTime.parse(created_date_string).strftime("%Y-%m-%dT%H:%M:%SZ") # Determine if we need to persist this record or not persistItem = true if created_date < last_created_date persistItem = false end # Publish the record to fluentd if persistItem now = Fluent::Engine.now router.emit(@tag, now, item) # write to the pos_file created_date_string open(@pos_file, 'a') do |f| f << "#{created_date_string}\n" end # Mark this as the last record successfully processed last_created_date_string = created_date_string last_created_date = created_date # Grab violation detail url and add to url list to process w/ thread pool xray_violation_details_url=item['violation_details_url'] xray_violation_urls_list.append(xray_violation_details_url) end end # iterate over url array adding to thread pool each url. # limit max workers to thread count to prevent overloading xray. thread_pool = Thread.pool(thread_count) for xray_violation_url in xray_violation_urls_list do thread_pool.process { pull_violation_details(xray_violation_url, @access_token) } end thread_pool.shutdown # reduce left violations by jump size (not all batches have full item count??) left_violations = left_violations - @batch_size if left_violations <= 0 sleep(@wait_interval) else # Grab the next record to process for the violation details url offset_count = offset_count + 1 xray_json={"filters": { "created_from": last_created_date_string }, "pagination": {"order_by": "created","limit": @batch_size , "offset": offset_count } } end end end |
#shutdown ⇒ Object
84 85 86 87 88 |
# File 'lib/fluent/plugin/in_jfrog_siem.rb', line 84 def shutdown @running = false @thread.join super end |
#start ⇒ Object
‘start` is called when starting and after `configure` is successfully completed.
77 78 79 80 81 |
# File 'lib/fluent/plugin/in_jfrog_siem.rb', line 77 def start super @running = true @thread = Thread.new(&method(:run)) end |