Class: Poller

Inherits:
Object
  • Object
show all
Defined in:
lib/publisher/poller.rb

Direct Known Subclasses

Google_Lab_Interface

Constant Summary collapse

EDTA =

3

require the file that implements the Adapter class.

"EDTA"
SERUM =
"SERUM"
PLASMA =
"PLASMA"
FLUORIDE =
"FLUORIDE"
ESR =
"ESR"
URINE =
"URINE"
REQUISITIONS_SORTED_SET =
"requisitions_sorted_set"
REQUISITIONS_HASH =
"requisitions_hash"
POLL_STATUS_KEY =
"ruby_astm_lis_poller"
LAST_REQUEST_AT =
"last_request_at"
LAST_REQUEST_STATUS =
"last_request_status"
RUNNING =
"running"
COMPLETED =
"completed"

Instance Method Summary collapse

Constructor Details

#initialize(mpg = nil) ⇒ Poller

Returns a new instance of Poller.



27
28
29
30
31
32
33
# File 'lib/publisher/poller.rb', line 27

def initialize(mpg=nil)
	$redis = Redis.new
	## this mapping is from MACHINE CODE AS THE KEY
    $mappings = JSON.parse(IO.read(mpg || AstmServer.default_mappings))
    ## INVERTING THE MAPPINGS, GIVES US THE LIS CODE AS THE KEY.
    $inverted_mappings = Hash[$mappings.values.map{|c| c = c["LIS_CODE"]}.zip($mappings.keys)]
end

Instance Method Details

#build_tests_hash(record) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/publisher/poller.rb', line 108

def build_tests_hash(record)
    tests_hash = {}

    ## key -> TUBE_NAME : eg: EDTA
    ## value -> its barcode id.
    tube_ids = {}
    ## assign.
    ## lavender -> index 28
    ## serum -> index 29
    ## plasm -> index 30
    ## fluoride -> index 31
    ## urine -> index 32
    ## esr -> index 33
    unless record[28].blank?
      tube_ids[EDTA] = record[28]
      tests_hash[EDTA + ":" + record[28]] = []
    end

    unless record[29].blank?
      tube_ids[SERUM] = record[29]
      tests_hash[SERUM + ":" + record[29]] = []
    end

    unless record[30].blank?
      tube_ids[PLASMA] = record[30]
      tests_hash[PLASMA + ":" + record[30]] = []
    end

    unless record[31].blank?
      tube_ids[FLUORIDE] = record[31]
      tests_hash[FLUORIDE + ":" + record[31]] = []
    end

    unless record[32].blank?
      tube_ids[URINE] = record[32]
      tests_hash[URINE + ":" + record[32]] = []
    end

    unless record[33].blank?
      tube_ids[ESR] = record[33]
      tests_hash[ESR + ":" + record[33]] = []
    end


    tests = record[8].split(",")
    tests.each do |test|
      ## use the inverted mappings to 
      if machine_code = $inverted_mappings[test]
        ## now get its tube type
        ## mappings have to match the tubes defined in this file.
        tube = $mappings[machine_code]["TUBE"]
        ## now find the tests_hash which has this tube.
        ## and the machine code to its array.
        ## so how to find this.
        tube_key = tests_hash.keys.select{|c| c=~/#{tube}/ }[0] 
        tests_hash[tube_key] << machine_code   
      else
        AstmServer.log("ERROR: Test: #{test} does not have an LIS code")
      end 
    end
    AstmServer.log("tests hash generated")
    AstmServer.log(JSON.generate(tests_hash))
    tests_hash
end

#merge_with_requisitions_hash(epoch, tests) ⇒ Object

@param epoch : the epoch at which these tests were requested. @param tests : => [MCV,MCH,MCHC…]



175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/publisher/poller.rb', line 175

def merge_with_requisitions_hash(epoch,tests)
    ## so we basically now add this to the epoch ?
    ## or a sorted set ?
    ## key -> TUBE:specimen_id
    ## value -> array of tests as json
    ## score -> time.
    $redis.multi do |multi|
      $redis.zadd REQUISITIONS_SORTED_SET, epoch, JSON.generate(tests)
      tests.keys.each do |tube_barcode|
        $redis.hset REQUISITIONS_HASH, tube_barcode, JSON.generate(tests[tube_barcode])
      end  
    end
end

#pollObject



332
333
334
335
336
337
# File 'lib/publisher/poller.rb', line 332

def poll
	pre_poll_LIS
	poll_LIS_for_requisition
	update_LIS
	post_poll_LIS
end

#poll_LIS_for_requisitionObject

will poll the lis, and store locally, in a redis sorted set the following: key => specimen_id value => tests designated for that specimen. score => time of requisition of that specimen. name of the sorted set can be defined in the class that inherits from adapter, or will default to “requisitions” when a query is sent from any laboratory equipment to the local ASTMServer, it will query the redis sorted set, for the test information. so this poller basically constantly replicates the cloud based test information to the local server.



328
329
330
# File 'lib/publisher/poller.rb', line 328

def poll_LIS_for_requisition

end

#post_poll_LISObject

uses redis CAS to ensure that two requests don’t overlap. will update to the requisitions hash the specimen id -> and the now lets test this. how to stub it out ? first we call it direct.



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/publisher/poller.rb', line 80

def post_poll_LIS
 
  requisition_status = JSON.parse($redis.get(POLL_STATUS_KEY))
  
  if (requisition_status[LAST_REQUEST_STATUS] == RUNNING)

    $redis.watch(POLL_STATUS_KEY) do

      if $redis.get(POLL_STATUS_KEY) == JSON.generate(requisition_status)

        $redis.multi do |multi|
          multi.set(POLL_STATUS_KEY,JSON.generate({LAST_REQUEST_STATUS => COMPLETED, LAST_REQUEST_AT => requisition_status[LAST_REQUEST_AT]}))
        end
        AstmServer.log("post poll LIS status set to completed")
      else
        AstmServer.log("post poll LIS was was interrupted by another client , so exited this thread")
        $redis.unwatch(POLL_STATUS_KEY)
        return
      end

    end

  else
  	AstmServer.log("post poll LIS was not in running state")
  end

end

#pre_poll_LISObject



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/publisher/poller.rb', line 46

def pre_poll_LIS
    previous_requisition_request_status = nil
    
    if previous_requisition_request_status = $redis.get(POLL_STATUS_KEY)

      last_request_at = previous_requisition_request_status[LAST_REQUEST_AT]
      
      last_request_status = previous_requisition_request_status[LAST_REQUEST_STATUS]  
    end

    running_time = Time.now.to_i

    $redis.watch(POLL_STATUS_KEY) do

      if $redis.get(POLL_STATUS_KEY) == previous_requisition_request_status
        if ((last_request_status != RUNNING) || ((Time.now.to_i - last_request_at) > 600))
          $redis.multi do |multi|
            multi.set(POLL_STATUS_KEY, JSON.generate({LAST_REQUEST_STATUS => RUNNING, LAST_REQUEST_AT => running_time}))
          end
          AstmServer.log("pre poll lis status set to running")
        end
      else
      	AstmServer.log("pre poll lis status check interrupted by another client, so exiting here")
        $redis.unwatch
        return
      end
    end
end

#prepare_redisObject



40
41
42
43
44
# File 'lib/publisher/poller.rb', line 40

def prepare_redis
	if ($redis.exists "processing") == 0
		$redis.lpush("processing",JSON.generate([]))
	end
end

#process_LIS_response(json_response) ⇒ Object



199
200
201
202
203
204
205
206
# File 'lib/publisher/poller.rb', line 199

def process_LIS_response(json_response)
  lab_results = JSON.parse(json_response)
  AstmServer.log("requisitions downloaded from LIS")
  AstmServer.log(JSON.generate(lab_results))
  lab_results.keys.each do |epoch|
    merge_with_requisitions_hash(epoch,build_tests_hash(lab_results[epoch][0]))
  end
end

#root_pathObject



35
36
37
# File 'lib/publisher/poller.rb', line 35

def root_path
 	File.dirname __dir__
end

#update(data) ⇒ Object

override to define how the data is updated.



209
210
211
# File 'lib/publisher/poller.rb', line 209

def update(data)

end

#update_LISObject

pretty simple, if the value is not already there it will be updated, otherwise it won’t be.



306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/publisher/poller.rb', line 306

def update_LIS
	prepare_redis
	patients_to_process = $redis.llen("patients") > 0
	while patients_to_process == true
		if patient_results = $redis.rpoplpush("patients","processing")
			patient_results = JSON.parse(patient_results)
			update(patient_results)
			patients_to_process = $redis.llen("patients") > 0
		else
			patients_to_process = false
		end
	end
end