This is the third part of the blog-post series about anomaly detection from healthcare data.
In part 1, we described the dataset, the business use-case and our general approach of applying graph algorithms (specifically the personalized-PageRank algorithm) to detect anomalies in the Medicare-B dataset.
In part 2, we described how we use Apache Hadoop, with Hortonworks Data Platform (HDP), and Pig to preprocess the Medicare-B dataset to make it ready for anomaly detection. First, we measure similarities between medical providers by computing cosine similarities of their medical procedure (CPT) codes. Then we generate an undirected graph where nodes represent providers (NPI codes), and the connecting edges indicate that the similarity scores between those nodes are higher than an acceptable threshold. We refer to this graph as the “provider graph.”
In this part 3, we will demonstrate how we can identify anomalies by performing analysis on the generated provider graph. To simplify the analysis process, we use SociaLite, an open-source graph analysis framework, which provides high-level abstractions for graphs.
We start with an overview of SociaLite, and some details about SociaLite queries, followed by the implementation details of our variant of the personalized PageRank algorithm to find anomalies in the provider graph.
SociaLite is a large-scale graph analysis framework with a high-level query language. Graph analysis is implemented as queries, which are compiled into efficient parallel and distributed code. SociaLite queries are embedded in Python code, which makes it possible to implement analysis partly in Python and partly in SociaLite. SocialLite provides an HDFS (Hadoop Distributed File System) interface, which allows reading data from and writing data to HDFS.
SociaLite supports two modes: a “standalone” mode and a “distributed” mode. By default, it runs on a single machine in the standalone mode. If a SociaLite cluster is set up and running, queries are compiled to run on the cluster (see the instructions in the SociaLite homepage).
In this demo, we will read the “provider graph” and all other necessary files directly from HDFS, and then run SociaLite in standalone mode.
In SociaLite, data is stored in column-oriented tables in memory (and across distributed machines in “distributed” mode). The following SociaLite code will create a table named Graph:
Graph(int src, int target)
The following code creates the “Graph” table with a slightly different structure:
Graph(int src, (int target)) indexby src
The parenthesis around the target column indicates that the column is nested; that is, for one vertex (or node) in the src column, multiple vertices can be stored in the target column. This helps to compactly store data with nested structures such as graphs. For performance gains, with the “indexby src” option SociaLite adds indexing to the column src, which results in faster joins.
In the following code, we further annotate the value of the src column with values between 0 and 1000:
Graph(int src:0..1000, (int target)) indexby src
Data stored in tables is processed using SociaLite queries, each comprised of a head and a body, separated by a “colon-dash”. Here is an example query:
Foaf(i, ff) :- Graph(i, f), Graph(f, ff)
In other words, the query is evaluated as follows:
For each tuple (i, f) in Graph (the first table in the body)
(a) we find matching tuples (f, ff) in Graph (second table in the body),
whose first column matches with the second column of (i, f).
(b) After the matching, we store the result (i, ff) into Foaf table.
As another example — in the following code—we use the built-in function “$inc” to compute the number of friends-of-friends for each vertex i.
FoafCount(i, $inc(1)) :- Graph(i, f), Graph(f, ff)
For each matching tuple, we increment the count by 1 with the $inc SociaLite built-in function.
SociaLite code can be embedded inside Python code, making it possible to write analysis code in both Python and SociaLite. To distinguish SociaLite code from Python code, SociaLite code is quoted with a backtick. Also, referring to python variables or functions inside a backticked expression is done using the $ sign.
Here is an example code that filters medical providers having a specific specialty:
specialties = [0, 1, 2, 3]
for specialty in specialties:
`Seed(npi) :- Doctor(npi, specialty), specialty==$specialty.`
for npi in `Seed(npi)`:
With the Python code above, we iterate over specialties; then with the SociaLite code in backtick, we find doctors having a specific specialty. The variable $specialty in SociaLite code refers to the variable defined in Python, and the code specialty==$specialty filters tuples whose specialty is same as $specialty value (0 for first iteration, 1 for second iteration, etc). Then with the for-loop below the SociaLite code, we iterate the values in Seed table, and print them.
SociaLite is included in the github repository for the demo code here. To run SociaLite interactively, clone the repository, compile socialite, and simply enter socialite/bin/socialite. Full instructions area available here.
For more information on SociaLite, please visit the github.
Next, we describe our Python and SociaLite implementation for using personalized PageRank to find anomalies in the Medicare-B Dataset.
In the previous part of this blog post, we generated the provider graph that includes an edge between each pair of providers that have similar patterns of medical procedures (CPT codes).
The following steps describe the details of our algorithm for anomaly detection:
Note that the cluster computed in step 4 considers the overall prescription patterns of the providers in the graph. Even if a provider is not directly connected to source providers, the algorithm considers its prescription pattern similar to source providers if the provider is indirectly connected to many source providers.
This algorithm is implemented in find-anomalies.py using Python and SociaLite. We will describe the important parts of the code in below.
The first few lines in find-anomalies.py load the provider graph and metadata into SociaLite tables. For example the following code loads the provider graph into the “Graph” table.
path = "hdfs://data/"
graph_file = path + "graph"
`Graph(int npi:0..$MAX_NPI_ID, (int npi2)) multiset.
Graph(npi1, npi2) :- l=$read($graph_file),
(_npi1, _npi2)=$split(l, "t"),
First we define the Graph table that represents edges in the provider graph. The first column is a source provider of an edge, and the second column is the target provider. The second column of Graph table is nested to compactly store multiple vertices that are connected to a source vertex. The option multiset indicates that a tuple can be stored multiple times in the Graph table; then the Graph table is optimized such that it does not check if a tuple is already in the table when inserting the tuple. Because we know that the $graph_file does not have redundant values, we use the multiset option for faster loading.
Then we load data from the graph file residing on HDFS, using the built-in $read, $split, and $toInt functions to read lines from the file, split the lines, and transform string values into integer values.
With the path set to “hdfs://data/”, the above code reads the graph file directly from HDFS in /user_home/data/, where user_home is the user’s HDFS home directory. SociaLite refers to HADOOP_HOME variable in socialite/conf/socialite-env.sh to find HDFS configuration files, and reads the file through the HDFS interfaces. When running on the same cluster where HDFS servers are running, SociaLite exploits the locality of the data in HDFS, just like Hadoop does, to efficiently read the data. To read a file from the local file system, simply change path to “file://data“.
The following tables store meta-data of the dataset.
`Specialty(int specialty:0..$MAX_SPECIALTY_ID, String descr).
Doctor(int npi:0..$MAX_NPI_ID, int specialty, (int code, int freq)).
Code(int code:0..$MAX_HCPCS_CODE_ID, String descr).`
The “Specialty” table stores specialty IDs and their descriptions. The “Doctor” table stores the provider IDs, their specialty IDs, and their prescribed CPT code as well as frequency (or count) of the CPT code. Because each provider had been prescribed multiple CPT codes, the last two columns of Doctor table are nested. The “Code” table stores CPT code IDs and their descriptions.
After loading all the data into SociaLite tables, we run the PageRank algorithm. We run the algorithm multiple times, once for each specialty group. Certain specialties such as General Practice or Nurse Practitioner are too general and do not work well with the algorithm (this is because a nurse, for example, can perform various procedures across multiple specialties, depending on the case), and so we skip those.
We select providers whose specialty is same as the selected specialty; these providers are source vertices in the algorithm. The providers are stored in the Source table as following:
`Source(npi) :- Doctor(npi, specialty, _, _), specialty==$specialty.`
`SourceCnt(int n:0..0, int cnt).
SourceCnt(0, $inc(1)) :- Source(npi).`
We also count the number of the providers to compute the initial PageRank value.
Now we define a “Rank” table that stores PageRank values for the providers as follows:
`Rank(int npi:0..$MAX_NPI_ID, int i:iter, float rank).`
The first column of “Rank” table stores the providers IDs, and the second column stores iteration number for the algorithm, and the third column represents the PageRank score for each provider. In other words, Rank(npi, i, rank) stores for a given provider npi, the PageRank value at iteration i.
Then we initialize the Rank table at iteration 0:
`Rank(source_npi, 0, pr) :- Source(source_npi), pr = 1/$N.`
The initial PageRank values at iteration 0 are 1/$N for the source providers where $N is the number of source providers; for the rest, the initial value is 0 (default for a SociaLite table).
Next, we iteratively compute the PageRank values in just 3 lines of code as following:
for i in range(10):
`Rank(node, $i+1, $sum(pr)) :- Source(node), pr=0.2f*1.0f/$N;
:- Rank(src, $i, pr1), pr1>1e-8, EdgeCnt(src, cnt),
pr = 0.8f*pr1/cnt, Graph(src, node).`
The SociaLite code above has two bodies separated by a semi-colon; this means that all the tuples computed from both the bodies are added to the table in the head. Recall from part 1 the two possible actions our random surfer can take in each step: moving to an adjacent edge or “teleporting” randomly to one of the source edge. With that in mind, it’s easy to see how this is implemented in the code:
We use the $sum aggregate function in the head to add all the probabilities by the random walks from neighbor vertices to the target vertex.
The iteration is repeated 10 times, which is enough for the PageRank values to converge. For simplicity the convergence test is omitted, but we can easily compute it by subtracting the PageRank values at iteration i and i+1 and adding them all; then repeat the computation until it converges.
One of the strengths of SociaLite is the ability to express an algorithm such as PageRank in literally 2-3 lines of code as shown above. The declarative nature of SociaLite makes it very powerful for implementing such algorithms quickly and with minimal errors. Then the SociaLite compiler generates optimized parallel/distributed code that runs efficiently on a single multi-core machine as well as a cluster of distributed machines.
As described in part 1, a principled way to use the resulting PageRank score is to integrate them into an existing Fraud detection system, whether based on rules or a supervised learning algorithm.
To demonstrate the results of our approach, we identified anomalous providers purely based on their PageRank scores. We consider providers anomalous if their PageRank score is high but whose specialty is different from the selected specialty.
The following code finds anomaly candidates:
AnomalyCandidate(int npi, float rank) multiset.
AnomalyCandidate(npi, rank) :- Rank(npi, $i, rank), NPI(npi, specialty, _, _),
specialty != $specialty, rank>=$threshold.`
We iterate over the providers’ PageRank values, and filter whose PageRank is higher than a threshold. This threshold is set to be the minimum of PageRank values among the providers having the selected specialty. Then we filter the providers whose specialty is different from the selected specialty.
We remove false-positives (whose specialty is too general) from AnomalyCandidate, and present the anomaly providers whose PageRank values are top 5 among anomaly candidates.
To test the algorithm, simply download the demo code in github and run the script run2.sh
We ran the algorithm on a single machine (Intel Xeon CPU E5-2640 (2.50GHz) with 20GB memory). Even though our provider graph is relatively large (close to 800M edges), it only takes around 5~6 minutes to load the graph, and 10~30 seconds to run the algorithm for each selected specialty group.
In this blog post, we described how to use SociaLite, an open-source graph analysis framework to compute personalized PageRank score for each node, and use the scores for anomaly detection in the provider graph.
All the code associated with this blog post series is available here.