Class: Pf_Lab_Interface

Inherits:
Poller
  • Object
show all
Defined in:
lib/publisher/pf_lab_interface.rb

Constant Summary collapse

ORDERS =
"orders"
ORDERS_SORTED_SET =
"orders_sorted_set"
BARCODES =
"barcodes"
BARCODE =
"barcode"
BASE_URL =
"http://localhost:3000/"
UPDATE_QUEUE =
"update_queue"
DEFAULT_LOOK_BACK_IN_SECONDS =

will look back 12 hours if no previous request is found.

12*3600
DEFAULT_STORAGE_TIME_FOR_ORDERS_IN_SECONDS =

time to keep old orders in memory 48 hours, expressed as seconds.

48*3600
POLL_URL_PATH =

the last request that was made and what it said.

BASE_URL + "interfaces"
PUT_URL_PATH =
BASE_URL + "lis_update_orders"
LAST_REQUEST =
"last_request"
FROM_EPOCH =
"from_epoch"
TO_EPOCH =
"to_epoch"
SIZE =
"size"
SKIP =
"skip"
ID =
"id"
REPORTS =
"reports"
TESTS =
"tests"
RESULT_RAW =
"result_raw"
CATEGORIES =
"categories"
USE_CATEGORY_FOR_LIS =
"use_category_for_lis"
LIS_CODE =
"lis_code"
REQUIREMENTS =
"requirements"
ITEMS =
"items"
CODE =
"code"
ORDERS_TO_UPDATE_PER_CYCLE =
10

Constants inherited from Poller

Poller::COMPLETED, Poller::EDTA, Poller::ESR, Poller::FLUORIDE, Poller::LAST_REQUEST_AT, Poller::LAST_REQUEST_STATUS, Poller::PLASMA, Poller::POLL_STATUS_KEY, Poller::REQUISITIONS_HASH, Poller::REQUISITIONS_SORTED_SET, Poller::RUNNING, Poller::SERUM, Poller::URINE

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Poller

#assign_tube, #build_tests_hash, #default_checkpoint, #determine_tube, #get_checkpoint, #merge_with_requisitions_hash, #post_poll_LIS, #pre_poll_LIS, #prepare_redis, #process_LIS_response, #root_path, #update_LIS

Constructor Details

#initialize(mpg = nil, lis_security_key) ⇒ Pf_Lab_Interface

METHODS OVERRIDDEN FROM THE BASIC POLLER.

@param mpg : path to mappings file. Defaults to nil. @param lis_security_key : the security key for the LIS organization, to be dowloaded from the organizations/show/id, endpoint in the website.



266
267
268
269
270
# File 'lib/publisher/pf_lab_interface.rb', line 266

def initialize(mpg=nil,lis_security_key)
    super(mpg)
    self.lis_security_key = lis_security_key
    AstmServer.log("Initialized Lab Interface")
end

Instance Attribute Details

#lis_security_keyObject

Returns the value of attribute lis_security_key.



38
39
40
# File 'lib/publisher/pf_lab_interface.rb', line 38

def lis_security_key
  @lis_security_key
end

Instance Method Details

#add_barcode(code, order_id) ⇒ Object



188
189
190
# File 'lib/publisher/pf_lab_interface.rb', line 188

def add_barcode(code,order_id)
	$redis.hset(BARCODES,code,order_id)
end

#add_order(order) ⇒ Object

@param order : order object, as a hash.



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/publisher/pf_lab_interface.rb', line 134

def add_order(order)
	## this whole thing should be done in one transaction
	order[REPORTS].each do |report|
		test_machine_codes = report[TESTS].map{|c|
			$inverted_mappings[c[LIS_CODE]]
		}.compact.uniq
		report[REQUIREMENTS].each do |req|
			get_priority_category(req)[ITEMS].each do |item|
				if !item[BARCODE].blank?
					add_barcode(item[BARCODE],JSON.generate(
						{
							:order_id => order[ID],
							:machine_codes => test_machine_codes
						}
					))
				elsif !item[CODE].blank?
					add_barcode(item[CODE],JSON.generate({
							:order_id => order[ID],
							:machine_codes => test_machine_codes
						}))
				end
			end
		end
	end
	$redis.hset(ORDERS,order[ID],JSON.generate(order))
	$redis.zadd(ORDERS_SORTED_SET,Time.now.to_i,order[ID])
end

#add_test_result(order, res) ⇒ Object

@param order : the existing order @param res : the result from the machine, pertaining to this order. @return $MAPPINGS -> [MACHINE_CODE => LIS_CODE] $INVERTED_MAPPINGS -> [LIS_CODE => MACHINE_CODE]



174
175
176
177
178
179
180
181
182
# File 'lib/publisher/pf_lab_interface.rb', line 174

def add_test_result(order,res)
	order[REPORTS.to_sym].each do |report|
		report[TESTS.to_sym].each_with_index{|t,k|
			if t[LIS_CODE.to_sym] == $mappings[res[:name]]
				t[RESULT_RAW.to_sym] = res[:value]
			end
		}
	end
end

#all_hits_downloaded?(last_request) ⇒ Boolean

def delete_last_request $redis.del(LAST_REQUEST) end

Returns:

  • (Boolean)


201
202
203
# File 'lib/publisher/pf_lab_interface.rb', line 201

def all_hits_downloaded?(last_request)
	last_request[FROM_EPOCH] == last_request[SIZE]
end

#build_requestObject



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/publisher/pf_lab_interface.rb', line 213

def build_request
	last_request = get_last_request
	params = nil
	if last_request.blank?
		params = fresh_request_params
	else
		if all_hits_downloaded?(last_request)
			params = fresh_request_params(last_request[:to_epoch])
		else
			params = last_request
		end 
	end
	params.merge!(lis_security_key: self.lis_security_key)
	Typhoeus::Request.new(POLL_URL_PATH,params: params)
end

#commit_request_params_to_redis(response_hash) ⇒ Object

commits the request params to redis. the response hash is expected to have whatever parameters were sent into it in the request. so it must always return: a -> how many it was told to skip (SKIP) b -> from_epoch : from which epoch it was queried. c -> to_epoch : to which epoch it was queried.



235
236
237
238
239
240
# File 'lib/publisher/pf_lab_interface.rb', line 235

def commit_request_params_to_redis(response_hash)
	$redis.hset(LAST_REQUEST,SKIP,response_hash[SKIP].to_i + response_hash[ORDERS].size.to_i)
	$redis.hset(LAST_REQUEST,SIZE,response_hash[SIZE].to_i)
	$redis.hset(LAST_REQUEST,FROM_EPOCH,response_hash[FROM_EPOCH].to_i)
	$redis.hset(LAST_REQUEST,TO_EPOCH,response_hash[TO_EPOCH].to_i)
end

#fresh_request_params(from_epoch = nil) ⇒ Object



205
206
207
208
209
210
211
# File 'lib/publisher/pf_lab_interface.rb', line 205

def fresh_request_params(from_epoch=nil)
	params = {}
	params[TO_EPOCH] = Time.now.to_i
	params[FROM_EPOCH] = from_epoch || (params[TO_EPOCH] - DEFAULT_LOOK_BACK_IN_SECONDS)
	params[SKIP] = 0
	params
end

#get_barcode(barcode) ⇒ Object

@return the entry at the barcode, or nil. key (order_id) value (array of tests registered on that barcode, the names of the tests are the machine codes, and not the lis_codes) this key is generated originally in add_barcode



103
104
105
106
107
108
109
# File 'lib/publisher/pf_lab_interface.rb', line 103

def get_barcode(barcode)	
	if barcode_hash = $redis.hget(BARCODES,barcode)
		JSON.parse(barcode_hash).deep_symbolize_keys
	else
		nil
	end
end

#get_last_requestObject



192
193
194
# File 'lib/publisher/pf_lab_interface.rb', line 192

def get_last_request
	$redis.hgetall(LAST_REQUEST)
end

#get_order(order_id) ⇒ Object



111
112
113
114
115
116
117
# File 'lib/publisher/pf_lab_interface.rb', line 111

def get_order(order_id)
	if order_string = $redis.hget(ORDERS,order_id)
		JSON.parse(order_string).deep_symbolize_keys
	else
		nil
	end
end

#get_priority_category(req) ⇒ Object

@param req : the requirement hash. @return priority_category : the category which has been chosen as the top priority for the requirement.



121
122
123
124
125
126
127
128
129
130
131
# File 'lib/publisher/pf_lab_interface.rb', line 121

def get_priority_category(req)
	priority_category = req[CATEGORIES].select{|c| 
		c[USE_CATEGORY_FOR_LIS] == 1
	}
	if priority_category.blank?
		priority_category = req[CATEGORIES][0]
	else
		priority_category = priority_category[0]
	end
	priority_category
end

#pollObject



476
477
478
479
480
481
# File 'lib/publisher/pf_lab_interface.rb', line 476

def poll
   	pre_poll_LIS
   	poll_LIS_for_requisition
   	update_LIS
   	post_poll_LIS
end

#poll_LIS_for_requisitionObject



272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/publisher/pf_lab_interface.rb', line 272

def poll_LIS_for_requisition
	AstmServer.log("Polling LIS at url:#{BASE_URL}")
	request = build_request
	request.on_complete do |response|
	  if response.success?
	    response_hash = JSON.parse(response.body)
	    orders = response_hash[ORDERS]
	    orders.each do |order|
	    	add_order(order) 
	    end
	    commit_request_params_to_redis(response_hash)
	  elsif response.timed_out?
	    # aw hell no
	    # put to astm log.
	    AstmServer.log("Polling time out")
	  elsif response.code == 0
	    # Could not get an http response, something's wrong.
	    AstmServer.log(response.return_message)
	  else
	    # Received a non-successful http response.
	    AstmServer.log("HTTP request failed: " + response.code.to_s)
	  end
	end
	request.run
end

#process_update_queueObject

data = [

  {
    :id => "ARUBA",
    :results => [
      {
        :name => "TLCparam",
        :value => 10
      },
      {
        :name => "Nparam",
        :value => 23
      },
      {
        :name => "ANCparam",
        :value => 25
      },
      {
        :name => "Lparam",
        :value => 10
      },
      {
        :name => "ALCparam",
        :value => 44
      },
      {
        :name => "Mparam",
        :value => 55
      },
      {
        :name => "AMCparam",
        :value => 22
      },
      {
        :name => "Eparam",
        :value => 222
      },
      {
        :name => "AECparam",
        :value => 21
      },
      {
        :name => "BASOparam",
        :value => 222
      },
      {
        :name => "ABCparam",
        :value => 300
      },
      {
        :name => "RBCparam",
        :value => 2.22
      },
      {
        :name => "HBparam",
        :value => 19
      },
      {
        :name => "HCTparam",
        :value => 22
      },
      {
        :name => "MCVparam",
        :value => 222
      },
      {
        :name => "MCHparam",
        :value => 21
      },
      {
        :name => "MCHCparam",
        :value => 10
      },
      {
        :name => "MCVparam",
        :value => 222
      },
      {
        :name => "RDWCVparam",
        :value => 12
      },
      {
        :name => "PCparam",
        :value => 1.22322
      }
    ]
  }
]


388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
# File 'lib/publisher/pf_lab_interface.rb', line 388

def process_update_queue
	#puts "came to process update queue."
	order_ids = []
	ORDERS_TO_UPDATE_PER_CYCLE.times do |n|
		order_ids << $redis.rpop(UPDATE_QUEUE)
	end
	#puts "order ids popped"
	#puts order_ids.to_s
	orders = order_ids.map{|c|
		get_order(c)
	}.compact

	
	#puts "orders are:"
	#puts orders.to_s

	req = Typhoeus::Request.new(PUT_URL_PATH, method: :put, body: {orders: orders}.to_json, params: {lis_security_key: self.lis_security_key}, headers: {Accept: 'application/json', "Content-Type".to_sym => 'application/json'})


	req.on_complete do |response|
		if response.success?
		    response_body = response.body
		    orders = JSON.parse(response.body)["orders"]
		    orders.each do |order|
		    	if order["errors"].blank?
		    	else
		    		puts "got an error for the order."
		    		## how many total error attempts to manage.
		    	end
		    end
		elsif response.timed_out?
		    AstmServer.log("got a time out")
		elsif response.code == 0
		    AstmServer.log(response.return_message)
		else
		    AstmServer.log("HTTP request failed: " + response.code.to_s)
		end
	end

	req.run

end

#queue_order_for_update(order) ⇒ Object



184
185
186
# File 'lib/publisher/pf_lab_interface.rb', line 184

def queue_order_for_update(order)
	$redis.lpush(UPDATE_QUEUE,order[ID.to_sym])
end

#remove_barcode(barcode) ⇒ Object



94
95
96
97
# File 'lib/publisher/pf_lab_interface.rb', line 94

def remove_barcode(barcode)
	return if barcode.blank?
	$redis.hdel(BARCODES,barcode)
end

#remove_old_ordersObject

removes any orders that start from now - 4 days ago now - 2 days ago



434
435
436
437
438
439
440
441
# File 'lib/publisher/pf_lab_interface.rb', line 434

def remove_old_orders
	stale_order_ids = $redis.zrangebyscore(ORDERS_SORTED_SET,(Time.now.to_i - DEFAULT_STORAGE_TIME_FOR_ORDERS_IN_SECONDS*2).to_s, (Time.now.to_i - DEFAULT_STORAGE_TIME_FOR_ORDERS_IN_SECONDS))
	$redis.pipelined do 
		stale_order_ids.each do |order_id|
			remove_order(order_id)
		end
	end
end

#remove_order(order_id) ⇒ Object

UTILITY METHOD FOR THE ORDER AND BARCODE HASHES ADD AND REMOVE



82
83
84
85
86
87
88
89
90
91
92
# File 'lib/publisher/pf_lab_interface.rb', line 82

def remove_order(order_id)
	order = get_order(order_id)
	order["reports"].each do |report|
		report["tests"].each do |test|
			remove_barcode(test["barcode"])
			remove_barcode(test["code"])
		end			
	end
	$redis.hdel(ORDERS,order[ID])
	$redis.zrem(ORDERS_SORTED_SET,order[ID])
end

#request_size_completed?(response_hash) ⇒ Boolean

since we request only a certain set of orders per request we need to know if the earlier request has been completed or we still need to rerequest the same time frame again.

Returns:

  • (Boolean)


245
246
247
# File 'lib/publisher/pf_lab_interface.rb', line 245

def request_size_completed?(response_hash)
	response_hash[SKIP].to_i + response_hash[ORDERS].size >= response_hash[SIZE]
end

#update(data) ⇒ Object



443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
# File 'lib/publisher/pf_lab_interface.rb', line 443

def update(data)
	data.each do |result|
		barcode = result[:id]
		results = result[:results]
		if barcode_hash = get_barcode(barcode)
			if order = get_order(barcode_hash[:order_id])
				## update the test results, and add the order to the final update hash.
				puts "order got from barcode is:"
				puts order
				machine_codes = barcode_hash[:machine_codes]
				## it has to be registered on this.
				results.each do |res|
					if machine_codes.include? res[:name]
						## so we need to update to the requisite test inside the order.
						add_test_result(order,res)	
						## commit to redis
						## and then 
					end
				end
				puts "came to queue order for update"
				queue_order_for_update(order)
			end
		else
			AstmServer.log("the barcode:#{barcode}, does not exist in the barcodes hash")
			## does not exist.
		end
	end 

	process_update_queue
	remove_old_orders

end

#update_order(order) ⇒ Object

start work on simple.



164
165
166
# File 'lib/publisher/pf_lab_interface.rb', line 164

def update_order(order)
	$redis.hset(ORDERS,order[ID],JSON.generate(order))
end