Get fresh updates from Hortonworks by email

Once a month, receive latest insights, trends, analytics information and knowledge of Big Data.


Sign up for the Developers Newsletter

Once a month, receive latest insights, trends, analytics information and knowledge of Big Data.


Get Started


Ready to Get Started?

Download sandbox

How can we help you?

* I understand I can unsubscribe at any time. I also acknowledge the additional information found in Hortonworks Privacy Policy.
closeClose button
March 27, 2015
prev slideNext slide

Using PageRank to Detect Anomalies and Fraud in Healthcare

This three part series is co-authored by Ofer Mendelevitch, director of data science at Hortonworks, and Jiwon Seo, Ph.D. and research assistant at Stanford University.


This is the third part of the blog-post series about anomaly detection from healthcare data.

In part 1, we described the dataset, the business use-case and our general approach of applying graph algorithms (specifically the personalized-PageRank algorithm) to detect anomalies in the Medicare-B dataset.

In part 2, we described how we use Apache Hadoop, with Hortonworks Data Platform (HDP), and Pig to preprocess the Medicare-B dataset to make it ready for anomaly detection. First, we measure similarities between medical providers by computing cosine similarities of their medical procedure (CPT) codes. Then we generate an undirected graph where nodes represent providers (NPI codes), and the connecting edges indicate that the similarity scores between those nodes are higher than an acceptable threshold. We refer to this graph as the “provider graph.”

In this part 3, we will demonstrate how we can identify anomalies by performing analysis on the generated provider graph. To simplify the analysis process, we use SociaLite, an open-source graph analysis framework, which provides high-level abstractions for graphs.

We start with an overview of SociaLite, and some details about SociaLite queries, followed by the implementation details of our variant of the personalized PageRank algorithm to find anomalies in the provider graph.

SocialLite – an open-source graph analysis framework

SociaLite is a large-scale graph analysis framework with a high-level query language. Graph analysis is implemented as queries, which are compiled into efficient parallel and distributed code. SociaLite queries are embedded in Python code, which makes it possible to implement analysis partly in Python and partly in SociaLite. SocialLite provides an HDFS (Hadoop Distributed File System) interface, which allows reading data from and writing data to HDFS.

SociaLite supports two modes: a “standalone” mode and a “distributed” mode. By default, it runs on a single machine in the standalone mode. If a SociaLite cluster is set up and running, queries are compiled to run on the cluster (see the instructions in the SociaLite homepage).

In this demo, we will read the “provider graph” and all other necessary files directly from HDFS, and then run SociaLite in standalone mode.

Tables in SociaLite

In SociaLite, data is stored in column-oriented tables in memory (and across distributed machines in “distributed” mode). The following SociaLite code will create a table named Graph:

Graph(int src, int target)

The table “Graph” has two columns named src and target that represent edges in a graph. SociaLite creates a table that is partitioned by the first column (src), and in distributed mode the partitions are stored across multiple machines.

The following code creates the “Graph” table with a slightly different structure:

Graph(int src, (int target)) indexby src

The parenthesis around the target column indicates that the column is nested; that is, for one vertex (or node) in the src column, multiple vertices can be stored in the target column. This helps to compactly store data with nested structures such as graphs. For performance gains, with the “indexby src” option SociaLite adds indexing to the column src, which results in faster joins.

In the following code, we further annotate the value of the src column with values between 0 and 1000:

Graph(int src:0..1000, (int target)) indexby src

With this annotation, the src column is represented by SociaLite as array indices, which reduces the memory footprint for storing the “Graph” table.

Queries in SociaLite

Data stored in tables is processed using SociaLite queries, each comprised of a head and a body, separated by a “colon-dash”. Here is an example query:

Foaf(i, ff) :- Graph(i, f), Graph(f, ff)

This query computes friends-of-friends. In the body, which is on the right of the colon-dash, the “Graph” table is joined with itself, where the variable f indicates the join key, and appears as the second column in the left side of the join, and the first column in the right side of the join. In the head, the join result is stored in another table – Foaf.

In other words, the query is evaluated as follows:

For each tuple (i, f) in Graph (the first table in the body)
(a) we find matching tuples (f, ff) in Graph (second table in the body),
whose first column matches with the second column of (i, f).
(b) After the matching, we store the result (i, ff) into Foaf table.

As another example — in the following code—we use the built-in function “$inc” to compute the number of friends-of-friends for each vertex i.

FoafCount(i, $inc(1)) :- Graph(i, f), Graph(f, ff)

For each matching tuple, we increment the count by 1 with the $inc SociaLite built-in function.

SociaLite – Python Integration

SociaLite code can be embedded inside Python code, making it possible to write analysis code in both Python and SociaLite. To distinguish SociaLite code from Python code, SociaLite code is quoted with a backtick. Also, referring to python variables or functions inside a backticked expression is done using the $ sign.

Here is an example code that filters medical providers having a specific specialty:

specialties = [0, 1, 2, 3]
for specialty in specialties:
`Seed(npi) :- Doctor(npi, specialty), specialty==$specialty.`
for npi in `Seed(npi)`:
print npi

With the Python code above, we iterate over specialties; then with the SociaLite code in backtick, we find doctors having a specific specialty. The variable $specialty in SociaLite code refers to the variable defined in Python, and the code specialty==$specialty filters tuples whose specialty is same as $specialty value (0 for first iteration, 1 for second iteration, etc). Then with the for-loop below the SociaLite code, we iterate the values in Seed table, and print them.

SociaLite Installation

SociaLite is included in the github repository for the demo code here. To run SociaLite interactively, clone the repository, compile socialite, and simply enter socialite/bin/socialite. Full instructions area available here.

For more information on SociaLite, please visit the github.

Finding Anomalies in Medicare Data with SociaLite

Next, we describe our Python and SociaLite implementation for using personalized PageRank to find anomalies in the Medicare-B Dataset.

In the previous part of this blog post, we generated the provider graph that includes an edge between each pair of providers that have similar patterns of medical procedures (CPT codes).

The following steps describe the details of our algorithm for anomaly detection:

  1. Select a specialty that we are interested in, e.g., Radiology
  2. Find all providers having the selected specialty
  3. Run personalized PageRank algorithm with the selected providers as “source vertices”
  4. Starting from the provider with highest PageRank value, we add the providers into a cluster. We stop adding the providers if all the providers with the selected specialty are added to the cluster. We expect the majority of the providers in the cluster will have the specialty that is same as the selected specialty.
  5. Mark the providers in the cluster whose specialty is different from the selected specialty as potential anomalies.
  6. From the potential anomalies, remove the providers whose specialty is too general, such as Physician Assistant or Nurse Practitioner, because they are likely false positives.
  7. Print out the top-K anomalies, namely those nodes (providers) whose PageRank scores are highest among the anomalous providers.

Note that the cluster computed in step 4 considers the overall prescription patterns of the providers in the graph. Even if a provider is not directly connected to source providers, the algorithm considers its prescription pattern similar to source providers if the provider is indirectly connected to many source providers.

This algorithm is implemented in using Python and SociaLite. We will describe the important parts of the code in below.

Loading The Data

The first few lines in load the provider graph and metadata into SociaLite tables. For example the following code loads the provider graph into the “Graph” table.

path = "hdfs://data/"

graph_file = path + "graph"
`Graph(int npi:0..$MAX_NPI_ID, (int npi2)) multiset.
Graph(npi1, npi2) :- l=$read($graph_file),
(_npi1, _npi2)=$split(l, "t"),
npi2=$toInt(_npi2). `

First we define the Graph table that represents edges in the provider graph. The first column is a source provider of an edge, and the second column is the target provider. The second column of Graph table is nested to compactly store multiple vertices that are connected to a source vertex. The option multiset indicates that a tuple can be stored multiple times in the Graph table; then the Graph table is optimized such that it does not check if a tuple is already in the table when inserting the tuple. Because we know that the $graph_file does not have redundant values, we use the multiset option for faster loading.

Then we load data from the graph file residing on HDFS, using the built-in $read, $split, and $toInt functions to read lines from the file, split the lines, and transform string values into integer values.

With the path set to “hdfs://data/”, the above code reads the graph file directly from HDFS in /user_home/data/, where user_home is the user’s HDFS home directory. SociaLite refers to HADOOP_HOME variable in socialite/conf/ to find HDFS configuration files, and reads the file through the HDFS interfaces. When running on the same cluster where HDFS servers are running, SociaLite exploits the locality of the data in HDFS, just like Hadoop does, to efficiently read the data. To read a file from the local file system, simply change path to “file://data“.

The following tables store meta-data of the dataset.

`Specialty(int specialty:0..$MAX_SPECIALTY_ID, String descr).
Doctor(int npi:0..$MAX_NPI_ID, int specialty, (int code, int freq)).
Code(int code:0..$MAX_HCPCS_CODE_ID, String descr).`

The “Specialty” table stores specialty IDs and their descriptions. The “Doctor” table stores the provider IDs, their specialty IDs, and their prescribed CPT code as well as frequency (or count) of the CPT code. Because each provider had been prescribed multiple CPT codes, the last two columns of Doctor table are nested. The “Code” table stores CPT code IDs and their descriptions.

Running the Personalized PageRank Algorithm with SociaLite

After loading all the data into SociaLite tables, we run the PageRank algorithm. We run the algorithm multiple times, once for each specialty group. Certain specialties such as General Practice or Nurse Practitioner are too general and do not work well with the algorithm (this is because a nurse, for example, can perform various procedures across multiple specialties, depending on the case), and so we skip those.

We select providers whose specialty is same as the selected specialty; these providers are source vertices in the algorithm. The providers are stored in the Source table as following:

`Source(npi) :- Doctor(npi, specialty, _, _), specialty==$specialty.`
`SourceCnt(int n:0..0, int cnt).
SourceCnt(0, $inc(1)) :- Source(npi).`

We also count the number of the providers to compute the initial PageRank value.

Now we define a “Rank” table that stores PageRank values for the providers as follows:

`Rank(int npi:0..$MAX_NPI_ID, int i:iter, float rank).`

The first column of “Rank” table stores the providers IDs, and the second column stores iteration number for the algorithm, and the third column represents the PageRank score for each provider. In other words, Rank(npi, i, rank) stores for a given provider npi, the PageRank value at iteration i.

Then we initialize the Rank table at iteration 0:

`Rank(source_npi, 0, pr) :- Source(source_npi), pr = 1/$N.`

The initial PageRank values at iteration 0 are 1/$N for the source providers where $N is the number of source providers; for the rest, the initial value is 0 (default for a SociaLite table).

Next, we iteratively compute the PageRank values in just 3 lines of code as following:

for i in range(10):
`Rank(node, $i+1, $sum(pr)) :- Source(node), pr=0.2f*1.0f/$N;
:- Rank(src, $i, pr1), pr1>1e-8, EdgeCnt(src, cnt),
pr = 0.8f*pr1/cnt, Graph(src, node).`

The SociaLite code above has two bodies separated by a semi-colon; this means that all the tuples computed from both the bodies are added to the table in the head. Recall from part 1 the two possible actions our random surfer can take in each step: moving to an adjacent edge or “teleporting” randomly to one of the source edge. With that in mind, it’s easy to see how this is implemented in the code:

  • The first body represents the teleportation action in PageRank, where our random surfer teleports with probability 0.2 to one of the vertices in the group of source providers.
  • The second body represents the random-walk action in PageRank, where our random surfer walks to an adjacent vertex with probability 0.8. For each vertex src where an edge exists between src and node, we retrieve the PageRank value of src pr1 at iteration $i. Then pr1 is divided by the number of edges src has, to get pr, the probability of the random walk from src to one of its neighbor vertices. This value pr is added to all its neighbor providers, referred as node, to compute the PageRank value of node at next iteration ($i+1). In other words, if the provider node has a neighbor provider src, then pr is the probability of random walk from this src provider; the PageRank value of node is computed by adding up the random walk probabilities from all of its neighbors. For faster performance, we ignore random walks from vertices whose PageRank value is smaller than 10-8. This computes approximate PageRank values.

We use the $sum aggregate function in the head to add all the probabilities by the random walks from neighbor vertices to the target vertex.

The iteration is repeated 10 times, which is enough for the PageRank values to converge. For simplicity the convergence test is omitted, but we can easily compute it by subtracting the PageRank values at iteration i and i+1 and adding them all; then repeat the computation until it converges.

One of the strengths of SociaLite is the ability to express an algorithm such as PageRank in literally 2-3 lines of code as shown above. The declarative nature of SociaLite makes it very powerful for implementing such algorithms quickly and with minimal errors. Then the SociaLite compiler generates optimized parallel/distributed code that runs efficiently on a single multi-core machine as well as a cluster of distributed machines.

Identifying Anomalies

As described in part 1, a principled way to use the resulting PageRank score is to integrate them into an existing Fraud detection system, whether based on rules or a supervised learning algorithm.

To demonstrate the results of our approach, we identified anomalous providers purely based on their PageRank scores. We consider providers anomalous if their PageRank score is high but whose specialty is different from the selected specialty.

The following code finds anomaly candidates:

AnomalyCandidate(int npi, float rank) multiset.
AnomalyCandidate(npi, rank) :- Rank(npi, $i, rank), NPI(npi, specialty, _, _),
specialty != $specialty, rank>=$threshold.`

We iterate over the providers’ PageRank values, and filter whose PageRank is higher than a threshold. This threshold is set to be the minimum of PageRank values among the providers having the selected specialty. Then we filter the providers whose specialty is different from the selected specialty.

We remove false-positives (whose specialty is too general) from AnomalyCandidate, and present the anomaly providers whose PageRank values are top 5 among anomaly candidates.

Running the algorithm

To test the algorithm, simply download the demo code in github and run the script

Hardware Configuration

We ran the algorithm on a single machine (Intel Xeon CPU E5-2640 (2.50GHz) with 20GB memory). Even though our provider graph is relatively large (close to 800M edges), it only takes around 5~6 minutes to load the graph, and 10~30 seconds to run the algorithm for each selected specialty group.


In this blog post, we described how to use SociaLite, an open-source graph analysis framework to compute personalized PageRank score for each node, and use the scores for anomaly detection in the provider graph.

Learn More

All the code associated with this blog post series is available here.

Also, read part 1 and part 2 of this series.



Prashanth says:

I am getting stop iteration error (trace below) while running, both when I was trying to read form HDFS or from the local raw data file raw_data10.txt. Not sure if this is a path problem or something specific to the way data file is written. Help appreciated, thanks.
[root@sandbox medicare-demo]# ./ 10
Reading specialty
Traceback (most recent call last):
File “code/”, line 27, in
num, _ = `Specialty(num, $descr)`.next()
File “/home/guest/medicare-demo/socialite/src/pysocialite/”, line 458, in next
n = self.__next__()
File “/home/guest/medicare-demo/socialite/src/pysocialite/”, line 454, in __next__
raise StopIteration

Prashanth says:

This is solved now with the correct Hadoop home setup with in conf/

Leave a Reply

Your email address will not be published. Required fields are marked *

If you have specific technical questions, please post them in the Forums