This is the second part of our blog-post series about anomaly detection from healthcare data. As described in part 1, our goal is to apply the personalized-PageRank algorithm to detect anomaly in healthcare payment records, specifically the publicly available Medicare-B dataset.
The similarity computation is performed in two steps: data preparation and similarity computation.
As described in part 1, the Medicare-B dataset includes a set of records documenting various characteristics of each transaction between medical providers and CMS (Center for Medicare and Medicaid), and we extract only 4 of those fields, namely:
While working on this problem we have identified a non-trivial data quality issue around provider specialty: the “specialty” field values in the original Medicare-B datasets are sometimes inaccurate, and contain the wrong specialty value.
This data quality issue impacts our anomaly detection algorithm. To remediate this issue we performed some pre-processing using two additional datasets:
We implement a simple “specialty lookup” solution as follows:
We then go through the Medicare-B dataset and replace the provided specialty with a more accurate specialty from our lookup. As a further refinement, we filter our providers that are not individuals (i.e., we throw away organizations) and also those who have more than a single specialty. We allow ourselves these simplifications since this is not meant as a comprehensive solution, noting that a real implementation would ideally handle organizations and multi-specialty providers in a more principled way.
Since this pre-processing is simple and fast, we implemented it using Python with the script prep-data.py. We won’t go into detail of that code here – it’s available for review by the interested reader on the Github repository.
The script gen-graph.pig (along with udfs.py which contains a few Python UDFs) is used to compute similarity between each pair of providers and build the similarity graph.
The first part of the script performs some basic data munging. It loads the raw dataset and generates a mapping of each entity type (specialty, CPT code, NPI) to an integer value (0-based index), used later by SociaLite.
This is quite straightforward — after filtering out empty values, we use PIG’s DISTINCT and RANK operators to perform this task for each entity type, resulting in 3 output files:
Here is the PIG code to generate the “specialty”:
RAW = load raw_data' using PigStorage('t') as (
npi: chararray, specialty: chararray,
hcpcs_code: chararray, hcpcs_description: chararray,
DATA = foreach RAW generate npi, specialty, hcpcs_code as cpt,
hcpcs_description as cpt_desc, bene_day_srvc_cnt as count;
-- generate speciality file
SP1 = foreach DATA generate specialty;
SP2 = filter SP1 by specialty != '';
SP3 = distinct SP2 parallel 5;
SP4 = rank SP3 by * ASC DENSE;
SPECIALTY = foreach SP4 generate $1 as sp_name, (int)$0-1 as sp_index;
store SPECIALTY into 'medicare/specialty' using PigStorage('t');
Note our usage of PIG’s “rank” operator to generate the 0-based index.
Similar code is used to generate hcpcs-code and npi-mapping.
In addition to these 3 files, we also generate the “npi-cpt-code” file, which contains tuples of the form:
For each provider NPI (with a known specialty) and CPT code – the “count” field in the tuple reflects the number of distinct beneficiary/per day services listed in the dataset. And the PIG code below shows how:
-- generate final dataset with npi, specialty, CPT and count
DATA0 = filter DATA by NOT(cpt_desc MATCHES '.*[Oo]ffice/outpatient visit.*'); -- remove 'too common' CPT for regular office visit
DATA1 = join DATA0 by npi, NPI_MAPPING by npi using 'replicated';
DATA2 = join DATA1 by classification, SPECIALTY by sp_name using 'replicated';
DATA3 = join DATA2 by cpt, HCPCS by cpt_code using 'replicated';
DATA4 = foreach DATA3 generate npi_index, sp_index, cpt_index, count;
NPI_CPT_CODE = foreach (group DATA4 by (npi_index, sp_index, cpt_index) parallel 20)
generate group.npi_index as npi,
group.sp_index as specialty,
group.cpt_index as cpt_inx, SUM(DATA4.count) as count;
store NPI_CPT_CODE into 'medicare/npi-cpt-code' using PigStorage('t');
Next, we compute the similarity score between each pair of providers. We use Cosine Similarity  , which in our case results in a score between 0 and 1 (1 represents most similar and 0 not similar at all).
We use a threshold of 0.85, and create an edge in the graph for any two providers if: (1) they have at least 2 CPT codes in common and (2) their similarity score is above 0.85. We have experimented with a few threshold values, and found that are method works well with other similar values such as 0.75 or 0.8.
Computing similarity for each pair of providers for a dataset of over 880,000 providers is a non-trivial task, resulting in quite a bit of computation. We utilize Hadoop to parallelize this computation, along with some heuristics to reduce the computation load. Let’s look at the code:
First, we set some Pig configuration parameters to make sure PIG compresses temporary output files, and also increase the default memory used by PIG to store bags as follows:
SET pig.tmpfilecompression true
SET pig.tmpfilecompression.codec gzip
SET pig.cachedbag.memusage 0.5
Before we compute similarity, we apply a few heuristics to filter out of our base dataset:
-- Filter out noisy CPT codes and noise NPIs
CODE_GRP = group NPI_CPT_CODE by cpt_inx parallel 5;
CNT_PER_CODE = foreach CODE_GRP generate group as cpt_inx, SUM($1.count) as cpt_total;
VALID_CODES = filter CNT_PER_CODE by cpt_total <= 10000000; -- Only keep CPTs where total count <= 10M
JOINED = join NPI_CPT_CODE by cpt_inx, VALID_CODES by cpt_inx;
JOINED_WITH_VALID_CODES = foreach JOINED generate npi, NPI_CPT_CODE::cpt_inx as cpt_inx, count;
NPI_GRP = group JOINED_WITH_VALID_CODES by npi parallel 5;
CNT_PER_NPI = foreach NPI_GRP generate group as npi, COUNT($1) as npi_count;
VALID_NPIS = filter CNT_PER_NPI by npi_count >= 3;
JOINED2 = join JOINED_WITH_VALID_CODES by npi, VALID_NPIS by npi;
DATA = foreach JOINED2 generate VALID_NPIS::npi as npi, (int)JOINED_WITH_VALID_CODES::cpt_inx as cpt_inx, count;
Finally, we compute similarity between NPIs, and keep only those whose similarity score is above 0.85, following these steps:
The PIG code to accomplish above is:
-- Create PTS: set of tuples, for each NPI its vector of CPT codes and associated counts (aka cpt_vec)
GRP = group DATA by npi parallel 10;
PTS = foreach GRP generate group as npi, DATA.(cpt_inx, count) as cpt_vec;
-- GROUP PTS into clusters keyed by top shared CPT
PTS_TOP = foreach PTS generate npi, cpt_vec, FLATTEN(udfs.top_cpt(cpt_vec)) as (cpt_inx: int, count: int);
PTS_TOP_CPT = foreach PTS_TOP generate npi, cpt_vec, cpt_inx;
CPT_CLUST = foreach (group PTS_TOP_CPT by cpt_inx parallel 10) generate PTS_TOP_CPT.(npi, cpt_vec) as clust_bag;
-- Use RANK to associate each cluster with a clust_id
RANKED = RANK CPT_CLUST;
ID_WITH_CLUST = foreach RANKED generate $0 as clust_id, clust_bag;
-- Compute pairs of NPIs that are similar to each other, with a cosine similarity score of 0.85 or higher.
-- We implement a few tricks to optimize performance:
-- 1. Split very long clusters into smaller sub-clusters, with max size of 2000 NPIs in each cluster
-- 2. Re-shuffle clusters using a random number to minimize skew
-- 3. Use 'replicated' join for map-side join
ID_WITH_SMALL_CLUST = foreach ID_WITH_CLUST generate clust_id, FLATTEN(udfs.breakLargeBag(clust_bag, 2000)) as clust_bag;
ID_WITH_SMALL_CLUST_RAND = foreach ID_WITH_SMALL_CLUST generate clust_id, clust_bag, RANDOM() as r;
ID_WITH_SMALL_CLUST_SHUF = foreach (GROUP ID_WITH_SMALL_CLUST_RAND by r parallel 240) generate FLATTEN($1) as (clust_id, clust_bag, r);
NPI_AND_CLUST_ID = foreach ID_WITH_CLUST generate FLATTEN(clust_bag) as (npi: int, cpt_vec), clust_id;
CLUST_JOINED = join ID_WITH_SMALL_CLUST_SHUF by clust_id, NPI_AND_CLUST_ID by clust_id using 'replicated';
PAIRS = foreach CLUST_JOINED generate npi as npi1, FLATTEN(udfs.similarNpi(npi, cpt_vec, clust_bag, 0.85)) as npi2;
-- Remove duplicate pairs
OUT = distinct PAIRS parallel 20;
To improve scalability we apply a few techniques worth noting:
This PIG script utilizes three Python UDF: top_cpt, breakLargeBag and similarNpi:
That’s it. When the computation is finished, the PIG script generates an output file “graph” which is simple a table of (source, dest) pairs representing the edges of out graph.
We ran this PIG script on a small HDP 2.2 cluster with 8 nodes, each with 4 cores and 64GB of memory, and the total runtime was about 3 hours. With stronger hardware we can of course parallelize this more and get faster overall runtime.
In this blog post we described how to use Apache Pig and Python to implement scalable computation of similarity scores between medical providers based on the Medicare-B dataset, and generate a graph based on these scores.
In the 3rd and last part of this blog post series, we will demonstrate how to use SocialLite to compute personalized PageRank on the graph and identify anomalies.