Get fresh updates from Hortonworks by email

Once a month, receive latest insights, trends, analytics information and knowledge of Big Data.


Sign up for the Developers Newsletter

Once a month, receive latest insights, trends, analytics information and knowledge of Big Data.


Get Started


Ready to Get Started?

Download sandbox

How can we help you?

* I understand I can unsubscribe at any time. I also acknowledge the additional information found in Hortonworks Privacy Policy.
closeClose button
March 12, 2015
prev slideNext slide

Using PageRank to Detect Anomalies and Fraud in Healthcare

This three part series is co-authored by Ofer Mendelevitch, director of data science at Hortonworks, and Jiwon Seo, Ph.D. and research assistant at Stanford University.


This is the second part of our blog-post series about anomaly detection from healthcare data. As described in part 1, our goal is to apply the personalized-PageRank algorithm to detect anomaly in healthcare payment records, specifically the publicly available Medicare-B dataset.

In this blog post, we demonstrate the technical steps to compute the similarity graph between medical providers at scale, using HDP and Apache Pig.

The similarity computation is performed in two steps: data preparation and similarity computation.

Step 1: Data preparation and cleanup with Python

As described in part 1, the Medicare-B dataset includes a set of records documenting various characteristics of each transaction between medical providers and CMS (Center for Medicare and Medicaid), and we extract only 4 of those fields, namely:

  • Provider NPI: a unique identifier of the medical provider
  • Specialty: the medical specialty of this provider
  • Procedure code: a specific code identifying the procedure type
  • Count: we use the bene_day_srvc_cnt field from the dataset, which measures the number of distinct beneficiary/per day services listed

While working on this problem we have identified a non-trivial data quality issue around provider specialty: the “specialty” field values in the original Medicare-B datasets are sometimes inaccurate, and contain the wrong specialty value.

This data quality issue impacts our anomaly detection algorithm. To remediate this issue we performed some pre-processing using two additional datasets:

  1. The national provider detail dataset
  2. The NUCC medical provider specialty taxonomy

We implement a simple “specialty lookup” solution as follows:

  1. Lookup the specialty of each provider using the national provider detail dataset, and the NUCC dataset
  2. Extract the necessary fields from the Medicare-B dataset and replacing the specialty field with the one from the first step

We then go through the Medicare-B dataset and replace the provided specialty with a more accurate specialty from our lookup. As a further refinement, we filter our providers that are not individuals (i.e., we throw away organizations) and also those who have more than a single specialty. We allow ourselves these simplifications since this is not meant as a comprehensive solution, noting that a real implementation would ideally handle organizations and multi-specialty providers in a more principled way.

Since this pre-processing is simple and fast, we implemented it using Python with the script We won’t go into detail of that code here – it’s available for review by the interested reader on the Github repository.

Step 2: Computing similarity between providers

The script gen-graph.pig (along with which contains a few Python UDFs) is used to compute similarity between each pair of providers and build the similarity graph.

The first part of the script performs some basic data munging. It loads the raw dataset and generates a mapping of each entity type (specialty, CPT code, NPI) to an integer value (0-based index), used later by SociaLite.

This is quite straightforward — after filtering out empty values, we use PIG’s DISTINCT and RANK operators to perform this task for each entity type, resulting in 3 output files:

  1. “specialty”: maps specialties to a 0-based index
  2. “hcpcs-code”: maps each CPT code to a 0-based index as well as its string description
  3. “npi-mapping”: maps each NPI (provider ID) to a 0-based index

Here is the PIG code to generate the “specialty”:

RAW = load raw_data' using PigStorage('t') as (
npi: chararray, specialty: chararray,
hcpcs_code: chararray, hcpcs_description: chararray,
bene_day_srvc_cnt: int);
DATA = foreach RAW generate npi, specialty, hcpcs_code as cpt,
hcpcs_description as cpt_desc, bene_day_srvc_cnt as count;

-- generate speciality file
SP1 = foreach DATA generate specialty;
SP2 = filter SP1 by specialty != '';
SP3 = distinct SP2 parallel 5;
SP4 = rank SP3 by * ASC DENSE;
SPECIALTY = foreach SP4 generate $1 as sp_name, (int)$0-1 as sp_index;
rmf medicare/specialty
store SPECIALTY into 'medicare/specialty' using PigStorage('t');

Note our usage of PIG’s “rank” operator to generate the 0-based index.
Similar code is used to generate hcpcs-code and npi-mapping.

In addition to these 3 files, we also generate the “npi-cpt-code” file, which contains tuples of the form:

(npi, specialty, cpt, count)

For each provider NPI (with a known specialty) and CPT code – the “count” field in the tuple reflects the number of distinct beneficiary/per day services listed in the dataset. And the PIG code below shows how:

-- generate final dataset with npi, specialty, CPT and count
DATA0 = filter DATA by NOT(cpt_desc MATCHES '.*[Oo]ffice/outpatient visit.*'); -- remove 'too common' CPT for regular office visit
DATA1 = join DATA0 by npi, NPI_MAPPING by npi using 'replicated';
DATA2 = join DATA1 by classification, SPECIALTY by sp_name using 'replicated';
DATA3 = join DATA2 by cpt, HCPCS by cpt_code using 'replicated';
DATA4 = foreach DATA3 generate npi_index, sp_index, cpt_index, count;
NPI_CPT_CODE = foreach (group DATA4 by (npi_index, sp_index, cpt_index) parallel 20)
generate group.npi_index as npi,
group.sp_index as specialty,
group.cpt_index as cpt_inx, SUM(DATA4.count) as count;
rmf medicare/npi-cpt-code
store NPI_CPT_CODE into 'medicare/npi-cpt-code' using PigStorage('t');

After filtering out CPT codes that are very common (for example, “office/outpatient visits”), we just join the necessary PIG relations (since they are small enough to fit in memory, we use a replicated join to maximize performance), and extract the 4 fields we need.

Next, we compute the similarity score between each pair of providers. We use Cosine Similarity [1] , which in our case results in a score between 0 and 1 (1 represents most similar and 0 not similar at all).

We use a threshold of 0.85, and create an edge in the graph for any two providers if: (1) they have at least 2 CPT codes in common and (2) their similarity score is above 0.85. We have experimented with a few threshold values, and found that are method works well with other similar values such as 0.75 or 0.8.

Computing similarity for each pair of providers for a dataset of over 880,000 providers is a non-trivial task, resulting in quite a bit of computation. We utilize Hadoop to parallelize this computation, along with some heuristics to reduce the computation load. Let’s look at the code:

First, we set some Pig configuration parameters to make sure PIG compresses temporary output files, and also increase the default memory used by PIG to store bags as follows:

SET pig.tmpfilecompression true
SET pig.tmpfilecompression.codec gzip
SET pig.cachedbag.memusage 0.5

Before we compute similarity, we apply a few heuristics to filter out of our base dataset:

  1. CPT codes that are extremely popular such as office visits, usually with total count above 10M
  2. NPIs who have less than 3 CPT codes; we remove such providers since the data we have on them with regards to CPT code pattern is too small and we consider it noisy

-- Filter out noisy CPT codes and noise NPIs
CODE_GRP = group NPI_CPT_CODE by cpt_inx parallel 5;
CNT_PER_CODE = foreach CODE_GRP generate group as cpt_inx, SUM($1.count) as cpt_total;
VALID_CODES = filter CNT_PER_CODE by cpt_total <= 10000000; -- Only keep CPTs where total count <= 10M
JOINED = join NPI_CPT_CODE by cpt_inx, VALID_CODES by cpt_inx;
JOINED_WITH_VALID_CODES = foreach JOINED generate npi, NPI_CPT_CODE::cpt_inx as cpt_inx, count;
NPI_GRP = group JOINED_WITH_VALID_CODES by npi parallel 5;
CNT_PER_NPI = foreach NPI_GRP generate group as npi, COUNT($1) as npi_count;
VALID_NPIS = filter CNT_PER_NPI by npi_count >= 3;
DATA = foreach JOINED2 generate VALID_NPIS::npi as npi, (int)JOINED_WITH_VALID_CODES::cpt_inx as cpt_inx, count;

Finally, we compute similarity between NPIs, and keep only those whose similarity score is above 0.85, following these steps:

  1. Generate the relation PTS, which contains for each NPI its “CPT vector” as a bag in PIG
  2. For each NPI we determine its most important CPT codes using the Python UDF top_cpt(). We then cluster all the NPIs that share a CPT code together in a bag, resulting in the relation CPT_CLUST.
  3. Compute similarity between each pair of NPIs that share the same CPT in their “Top CPT” vector, and output those pairs with similarity score above our threshold of 0.85

The PIG code to accomplish above is:

-- Create PTS: set of tuples, for each NPI its vector of CPT codes and associated counts (aka cpt_vec)
GRP = group DATA by npi parallel 10;
PTS = foreach GRP generate group as npi, DATA.(cpt_inx, count) as cpt_vec;

-- GROUP PTS into clusters keyed by top shared CPT
PTS_TOP = foreach PTS generate npi, cpt_vec, FLATTEN(udfs.top_cpt(cpt_vec)) as (cpt_inx: int, count: int);
PTS_TOP_CPT = foreach PTS_TOP generate npi, cpt_vec, cpt_inx;
CPT_CLUST = foreach (group PTS_TOP_CPT by cpt_inx parallel 10) generate PTS_TOP_CPT.(npi, cpt_vec) as clust_bag;

-- Use RANK to associate each cluster with a clust_id
ID_WITH_CLUST = foreach RANKED generate $0 as clust_id, clust_bag;

-- Compute pairs of NPIs that are similar to each other, with a cosine similarity score of 0.85 or higher.
-- We implement a few tricks to optimize performance:
-- 1. Split very long clusters into smaller sub-clusters, with max size of 2000 NPIs in each cluster
-- 2. Re-shuffle clusters using a random number to minimize skew
-- 3. Use 'replicated' join for map-side join
ID_WITH_SMALL_CLUST = foreach ID_WITH_CLUST generate clust_id, FLATTEN(udfs.breakLargeBag(clust_bag, 2000)) as clust_bag;
ID_WITH_SMALL_CLUST_RAND = foreach ID_WITH_SMALL_CLUST generate clust_id, clust_bag, RANDOM() as r;
ID_WITH_SMALL_CLUST_SHUF = foreach (GROUP ID_WITH_SMALL_CLUST_RAND by r parallel 240) generate FLATTEN($1) as (clust_id, clust_bag, r);
NPI_AND_CLUST_ID = foreach ID_WITH_CLUST generate FLATTEN(clust_bag) as (npi: int, cpt_vec), clust_id;
CLUST_JOINED = join ID_WITH_SMALL_CLUST_SHUF by clust_id, NPI_AND_CLUST_ID by clust_id using 'replicated';
PAIRS = foreach CLUST_JOINED generate npi as npi1, FLATTEN(udfs.similarNpi(npi, cpt_vec, clust_bag, 0.85)) as npi2;

-- Remove duplicate pairs
OUT = distinct PAIRS parallel 20;

To improve scalability we apply a few techniques worth noting:

  1. We use PIG’s “parallel” keyword in various places to guide pig to use more reducers and further parallelize these operations
  2. We use Pig’s “replicated join” (aka map-side join) when generating CLUST_JOINED
  3. Because NPI clusters vary in size, from large to small, we have data skew, which results in uneven computation time across reducers. We therefore use two techniques to reduce data skew:
    • We split large clusters into a set of smaller clusters (no more than 2000 NPI in a cluster), and thus reduce the largest cluster size – ID_WITH_SMALL_CLUST
    • We reshuffle the clusters with a random number generator, so that the really large clusters end up split evenly between reducers – ID_WITH_SMALL_CLUST_SHUF.

This PIG script utilizes three Python UDF: top_cpt, breakLargeBag and similarNpi:

  • top_cpt retrieves the most important CPT codes for a provider, as described above
  • breakLargeBag breaks-up large clusters into smaller ones, where each smaller one contains no more than a specific number of items
  • similarNpi is the main UDF that computes pairs of NPI which are considered “similar” (above the threshold of 0.85 similarity score).

That’s it. When the computation is finished, the PIG script generates an output file “graph” which is simple a table of (source, dest) pairs representing the edges of out graph.

HDP Configuration

We ran this PIG script on a small HDP 2.2 cluster with 8 nodes, each with 4 cores and 64GB of memory, and the total runtime was about 3 hours. With stronger hardware we can of course parallelize this more and get faster overall runtime.


In this blog post we described how to use Apache Pig and Python to implement scalable computation of similarity scores between medical providers based on the Medicare-B dataset, and generate a graph based on these scores.

In the 3rd and last part of this blog post series, we will demonstrate how to use SocialLite to compute personalized PageRank on the graph and identify anomalies.

Learn More


Leave a Reply

Your email address will not be published. Required fields are marked *

If you have specific technical questions, please post them in the Forums