Class: Pf_Lab_Interface
- Defined in:
- lib/publisher/pf_lab_interface.rb
Constant Summary collapse
- ORDERS =
"orders"
- ORDERS_SORTED_SET =
"orders_sorted_set"
- BARCODES =
"barcodes"
- BARCODE =
"barcode"
- BASE_URL =
"http://localhost:3000/"
- UPDATE_QUEUE =
"update_queue"
- DEFAULT_LOOK_BACK_IN_SECONDS =
will look back 12 hours if no previous request is found.
12*3600
- DEFAULT_STORAGE_TIME_FOR_ORDERS_IN_SECONDS =
time to keep old orders in memory 48 hours, expressed as seconds.
48*3600
- POLL_URL_PATH =
the last request that was made and what it said.
BASE_URL + "interfaces"
- PUT_URL_PATH =
BASE_URL + "lis_update_orders"
- LAST_REQUEST =
"last_request"
- FROM_EPOCH =
"from_epoch"
- TO_EPOCH =
"to_epoch"
- SIZE =
"size"
- SKIP =
"skip"
- ID =
"id"
- REPORTS =
"reports"
- TESTS =
"tests"
- RESULT_RAW =
"result_raw"
- CATEGORIES =
"categories"
- USE_CATEGORY_FOR_LIS =
"use_category_for_lis"
- LIS_CODE =
"lis_code"
- REQUIREMENTS =
"requirements"
- ITEMS =
"items"
- CODE =
"code"
- ORDERS_TO_UPDATE_PER_CYCLE =
10
Constants inherited from Poller
Poller::COMPLETED, Poller::EDTA, Poller::ESR, Poller::FLUORIDE, Poller::LAST_REQUEST_AT, Poller::LAST_REQUEST_STATUS, Poller::PLASMA, Poller::POLL_STATUS_KEY, Poller::REQUISITIONS_HASH, Poller::REQUISITIONS_SORTED_SET, Poller::RUNNING, Poller::SERUM, Poller::URINE
Instance Attribute Summary collapse
-
#lis_security_key ⇒ Object
Returns the value of attribute lis_security_key.
Instance Method Summary collapse
- #add_barcode(code, order_id) ⇒ Object
-
#add_order(order) ⇒ Object
@param order : order object, as a hash.
- #add_test_result(order, res) ⇒ Object
-
#all_hits_downloaded?(last_request) ⇒ Boolean
def delete_last_request $redis.del(LAST_REQUEST) end.
- #build_request ⇒ Object
-
#commit_request_params_to_redis(response_hash) ⇒ Object
commits the request params to redis.
- #fresh_request_params(from_epoch = nil) ⇒ Object
-
#get_barcode(barcode) ⇒ Object
@return the entry at the barcode, or nil.
- #get_last_request ⇒ Object
- #get_order(order_id) ⇒ Object
-
#get_priority_category(req) ⇒ Object
@param req : the requirement hash.
-
#initialize(mpg = nil, lis_security_key) ⇒ Pf_Lab_Interface
constructor
METHODS OVERRIDDEN FROM THE BASIC POLLER.
- #poll ⇒ Object
- #poll_LIS_for_requisition ⇒ Object
-
#process_update_queue ⇒ Object
data = [ { :id => “ARUBA”, :results => [ { :name => “TLCparam”, :value => 10 }, { :name => “Nparam”, :value => 23 }, { :name => “ANCparam”, :value => 25 }, { :name => “Lparam”, :value => 10 }, { :name => “ALCparam”, :value => 44 }, { :name => “Mparam”, :value => 55 }, { :name => “AMCparam”, :value => 22 }, { :name => “Eparam”, :value => 222 }, { :name => “AECparam”, :value => 21 }, { :name => “BASOparam”, :value => 222 }, { :name => “ABCparam”, :value => 300 }, { :name => “RBCparam”, :value => 2.22 }, { :name => “HBparam”, :value => 19 }, { :name => “HCTparam”, :value => 22 }, { :name => “MCVparam”, :value => 222 }, { :name => “MCHparam”, :value => 21 }, { :name => “MCHCparam”, :value => 10 }, { :name => “MCVparam”, :value => 222 }, { :name => “RDWCVparam”, :value => 12 }, { :name => “PCparam”, :value => 1.22322 } ] } ].
- #queue_order_for_update(order) ⇒ Object
- #remove_barcode(barcode) ⇒ Object
-
#remove_old_orders ⇒ Object
removes any orders that start from now - 4 days ago now - 2 days ago.
-
#remove_order(order_id) ⇒ Object
UTILITY METHOD FOR THE ORDER AND BARCODE HASHES ADD AND REMOVE.
-
#request_size_completed?(response_hash) ⇒ Boolean
since we request only a certain set of orders per request we need to know if the earlier request has been completed or we still need to rerequest the same time frame again.
- #update(data) ⇒ Object
-
#update_order(order) ⇒ Object
start work on simple.
Methods inherited from Poller
#assign_tube, #build_tests_hash, #default_checkpoint, #determine_tube, #get_checkpoint, #merge_with_requisitions_hash, #post_poll_LIS, #pre_poll_LIS, #prepare_redis, #process_LIS_response, #root_path, #update_LIS
Constructor Details
#initialize(mpg = nil, lis_security_key) ⇒ Pf_Lab_Interface
266 267 268 269 270 |
# File 'lib/publisher/pf_lab_interface.rb', line 266 def initialize(mpg=nil,lis_security_key) super(mpg) self.lis_security_key = lis_security_key AstmServer.log("Initialized Lab Interface") end |
Instance Attribute Details
#lis_security_key ⇒ Object
Returns the value of attribute lis_security_key.
38 39 40 |
# File 'lib/publisher/pf_lab_interface.rb', line 38 def lis_security_key @lis_security_key end |
Instance Method Details
#add_barcode(code, order_id) ⇒ Object
188 189 190 |
# File 'lib/publisher/pf_lab_interface.rb', line 188 def (code,order_id) $redis.hset(BARCODES,code,order_id) end |
#add_order(order) ⇒ Object
@param order : order object, as a hash.
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/publisher/pf_lab_interface.rb', line 134 def add_order(order) ## this whole thing should be done in one transaction order[REPORTS].each do |report| test_machine_codes = report[TESTS].map{|c| $inverted_mappings[c[LIS_CODE]] }.compact.uniq report[REQUIREMENTS].each do |req| get_priority_category(req)[ITEMS].each do |item| if !item[BARCODE].blank? (item[BARCODE],JSON.generate( { :order_id => order[ID], :machine_codes => test_machine_codes } )) elsif !item[CODE].blank? (item[CODE],JSON.generate({ :order_id => order[ID], :machine_codes => test_machine_codes })) end end end end $redis.hset(ORDERS,order[ID],JSON.generate(order)) $redis.zadd(ORDERS_SORTED_SET,Time.now.to_i,order[ID]) end |
#add_test_result(order, res) ⇒ Object
174 175 176 177 178 179 180 181 182 |
# File 'lib/publisher/pf_lab_interface.rb', line 174 def add_test_result(order,res) order[REPORTS.to_sym].each do |report| report[TESTS.to_sym].each_with_index{|t,k| if t[LIS_CODE.to_sym] == $mappings[res[:name]] t[RESULT_RAW.to_sym] = res[:value] end } end end |
#all_hits_downloaded?(last_request) ⇒ Boolean
def delete_last_request $redis.del(LAST_REQUEST) end
201 202 203 |
# File 'lib/publisher/pf_lab_interface.rb', line 201 def all_hits_downloaded?(last_request) last_request[FROM_EPOCH] == last_request[SIZE] end |
#build_request ⇒ Object
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/publisher/pf_lab_interface.rb', line 213 def build_request last_request = get_last_request params = nil if last_request.blank? params = fresh_request_params else if all_hits_downloaded?(last_request) params = fresh_request_params(last_request[:to_epoch]) else params = last_request end end params.merge!(lis_security_key: self.lis_security_key) Typhoeus::Request.new(POLL_URL_PATH,params: params) end |
#commit_request_params_to_redis(response_hash) ⇒ Object
commits the request params to redis. the response hash is expected to have whatever parameters were sent into it in the request. so it must always return: a -> how many it was told to skip (SKIP) b -> from_epoch : from which epoch it was queried. c -> to_epoch : to which epoch it was queried.
235 236 237 238 239 240 |
# File 'lib/publisher/pf_lab_interface.rb', line 235 def commit_request_params_to_redis(response_hash) $redis.hset(LAST_REQUEST,SKIP,response_hash[SKIP].to_i + response_hash[ORDERS].size.to_i) $redis.hset(LAST_REQUEST,SIZE,response_hash[SIZE].to_i) $redis.hset(LAST_REQUEST,FROM_EPOCH,response_hash[FROM_EPOCH].to_i) $redis.hset(LAST_REQUEST,TO_EPOCH,response_hash[TO_EPOCH].to_i) end |
#fresh_request_params(from_epoch = nil) ⇒ Object
205 206 207 208 209 210 211 |
# File 'lib/publisher/pf_lab_interface.rb', line 205 def fresh_request_params(from_epoch=nil) params = {} params[TO_EPOCH] = Time.now.to_i params[FROM_EPOCH] = from_epoch || (params[TO_EPOCH] - DEFAULT_LOOK_BACK_IN_SECONDS) params[SKIP] = 0 params end |
#get_barcode(barcode) ⇒ Object
@return the entry at the barcode, or nil. key (order_id) value (array of tests registered on that barcode, the names of the tests are the machine codes, and not the lis_codes) this key is generated originally in add_barcode
103 104 105 106 107 108 109 |
# File 'lib/publisher/pf_lab_interface.rb', line 103 def () if = $redis.hget(BARCODES,) JSON.parse().deep_symbolize_keys else nil end end |
#get_last_request ⇒ Object
192 193 194 |
# File 'lib/publisher/pf_lab_interface.rb', line 192 def get_last_request $redis.hgetall(LAST_REQUEST) end |
#get_order(order_id) ⇒ Object
111 112 113 114 115 116 117 |
# File 'lib/publisher/pf_lab_interface.rb', line 111 def get_order(order_id) if order_string = $redis.hget(ORDERS,order_id) JSON.parse(order_string).deep_symbolize_keys else nil end end |
#get_priority_category(req) ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/publisher/pf_lab_interface.rb', line 121 def get_priority_category(req) priority_category = req[CATEGORIES].select{|c| c[USE_CATEGORY_FOR_LIS] == 1 } if priority_category.blank? priority_category = req[CATEGORIES][0] else priority_category = priority_category[0] end priority_category end |
#poll ⇒ Object
476 477 478 479 480 481 |
# File 'lib/publisher/pf_lab_interface.rb', line 476 def poll pre_poll_LIS poll_LIS_for_requisition update_LIS post_poll_LIS end |
#poll_LIS_for_requisition ⇒ Object
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 |
# File 'lib/publisher/pf_lab_interface.rb', line 272 def poll_LIS_for_requisition AstmServer.log("Polling LIS at url:#{BASE_URL}") request = build_request request.on_complete do |response| if response.success? response_hash = JSON.parse(response.body) orders = response_hash[ORDERS] orders.each do |order| add_order(order) end commit_request_params_to_redis(response_hash) elsif response.timed_out? # aw hell no # put to astm log. AstmServer.log("Polling time out") elsif response.code == 0 # Could not get an http response, something's wrong. AstmServer.log(response.) else # Received a non-successful http response. AstmServer.log("HTTP request failed: " + response.code.to_s) end end request.run end |
#process_update_queue ⇒ Object
data = [
{
:id => "ARUBA",
:results => [
{
:name => "TLCparam",
:value => 10
},
{
:name => "Nparam",
:value => 23
},
{
:name => "ANCparam",
:value => 25
},
{
:name => "Lparam",
:value => 10
},
{
:name => "ALCparam",
:value => 44
},
{
:name => "Mparam",
:value => 55
},
{
:name => "AMCparam",
:value => 22
},
{
:name => "Eparam",
:value => 222
},
{
:name => "AECparam",
:value => 21
},
{
:name => "BASOparam",
:value => 222
},
{
:name => "ABCparam",
:value => 300
},
{
:name => "RBCparam",
:value => 2.22
},
{
:name => "HBparam",
:value => 19
},
{
:name => "HCTparam",
:value => 22
},
{
:name => "MCVparam",
:value => 222
},
{
:name => "MCHparam",
:value => 21
},
{
:name => "MCHCparam",
:value => 10
},
{
:name => "MCVparam",
:value => 222
},
{
:name => "RDWCVparam",
:value => 12
},
{
:name => "PCparam",
:value => 1.22322
}
]
}
]
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 |
# File 'lib/publisher/pf_lab_interface.rb', line 388 def process_update_queue #puts "came to process update queue." order_ids = [] ORDERS_TO_UPDATE_PER_CYCLE.times do |n| order_ids << $redis.rpop(UPDATE_QUEUE) end #puts "order ids popped" #puts order_ids.to_s orders = order_ids.map{|c| get_order(c) }.compact #puts "orders are:" #puts orders.to_s req = Typhoeus::Request.new(PUT_URL_PATH, method: :put, body: {orders: orders}.to_json, params: {lis_security_key: self.lis_security_key}, headers: {Accept: 'application/json', "Content-Type".to_sym => 'application/json'}) req.on_complete do |response| if response.success? response_body = response.body orders = JSON.parse(response.body)["orders"] orders.each do |order| if order["errors"].blank? else puts "got an error for the order." ## how many total error attempts to manage. end end elsif response.timed_out? AstmServer.log("got a time out") elsif response.code == 0 AstmServer.log(response.) else AstmServer.log("HTTP request failed: " + response.code.to_s) end end req.run end |
#queue_order_for_update(order) ⇒ Object
184 185 186 |
# File 'lib/publisher/pf_lab_interface.rb', line 184 def queue_order_for_update(order) $redis.lpush(UPDATE_QUEUE,order[ID.to_sym]) end |
#remove_barcode(barcode) ⇒ Object
94 95 96 97 |
# File 'lib/publisher/pf_lab_interface.rb', line 94 def () return if .blank? $redis.hdel(BARCODES,) end |
#remove_old_orders ⇒ Object
removes any orders that start from now - 4 days ago now - 2 days ago
434 435 436 437 438 439 440 441 |
# File 'lib/publisher/pf_lab_interface.rb', line 434 def remove_old_orders stale_order_ids = $redis.zrangebyscore(ORDERS_SORTED_SET,(Time.now.to_i - DEFAULT_STORAGE_TIME_FOR_ORDERS_IN_SECONDS*2).to_s, (Time.now.to_i - DEFAULT_STORAGE_TIME_FOR_ORDERS_IN_SECONDS)) $redis.pipelined do stale_order_ids.each do |order_id| remove_order(order_id) end end end |
#remove_order(order_id) ⇒ Object
UTILITY METHOD FOR THE ORDER AND BARCODE HASHES ADD AND REMOVE
82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/publisher/pf_lab_interface.rb', line 82 def remove_order(order_id) order = get_order(order_id) order["reports"].each do |report| report["tests"].each do |test| (test["barcode"]) (test["code"]) end end $redis.hdel(ORDERS,order[ID]) $redis.zrem(ORDERS_SORTED_SET,order[ID]) end |
#request_size_completed?(response_hash) ⇒ Boolean
since we request only a certain set of orders per request we need to know if the earlier request has been completed or we still need to rerequest the same time frame again.
245 246 247 |
# File 'lib/publisher/pf_lab_interface.rb', line 245 def request_size_completed?(response_hash) response_hash[SKIP].to_i + response_hash[ORDERS].size >= response_hash[SIZE] end |
#update(data) ⇒ Object
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 |
# File 'lib/publisher/pf_lab_interface.rb', line 443 def update(data) data.each do |result| = result[:id] results = result[:results] if = () if order = get_order([:order_id]) ## update the test results, and add the order to the final update hash. puts "order got from barcode is:" puts order machine_codes = [:machine_codes] ## it has to be registered on this. results.each do |res| if machine_codes.include? res[:name] ## so we need to update to the requisite test inside the order. add_test_result(order,res) ## commit to redis ## and then end end puts "came to queue order for update" queue_order_for_update(order) end else AstmServer.log("the barcode:#{}, does not exist in the barcodes hash") ## does not exist. end end process_update_queue remove_old_orders end |