Magellan: Geospatial Analytics on Spark

Magellan: Geospatial Analytics on Spark

Geospatial data is pervasive—in mobile devices, sensors, logs, and wearables. This data’s spatial context is an important variable in many predictive analytics applications.

To benefit from spatial context in a predictive analytics application, we need to be able to parse geospatial datasets at scale, join them with target datasets that contain point in space information, and answer geometrical queries efficiently.

Unfortunately, if you are working with geospatial data and big data sets that need spatial context, there are limited open source tools that make it easy for you to parse and efficiently query spatial datasets at scale. This poses significant challenges for leveraging geospatial data in business intelligence and predictive analytics applications.

This is the problem that Magellan sets out to solve. Magellan is an open source library for Geospatial Analytics that uses Apache Spark as the underlying execution engine. Magellan facilitates geospatial queries and builds upon Spark to solve hard problems of dealing with geospatial data at scale.

In this blog post, we will introduce the problem of geospatial analytics and show how Magellan allows users to ingest geospatial data and run spatial queries at scale.

To do so, we will analyze the problem of using Uber data to examine the flow of uber traffic in the city of San Francisco.

Mapping the flow of Uber traffic in San Francisco with Magellan

Uber has published a dataset of GPS coordinates of all trips within San Francisco.

Our goal in this example is to join the Uber dataset with the San Francisco neighborhoods dataset) to obtain some interesting insights into the patterns of Uber trips in San Francisco.

Magellan has both Scala and Python bindings. In this blog post we use  the Scala APIs.
Magellan is a Spark Package, and can be included while launching the spark shell as follows:

            bin/spark-shell --packages harsha2010:magellan:1.0.3-s_2.10

The following imports are needed:

 

import magellan.{Point, Polygon, PolyLine}
import magellan.coord.NAD83
import org.apache.spark.sql.magellan.MagellanContext
import org.apache.spark.sql.magellan.dsl.expressions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

First, we need to read the uber dataset. We assume the dataset has been downloaded and the path to the dataset is uber.path.
Let us create a case class to attach the schema to this Uber Dataset so we can use the DataFrame abstraction to deal with the data.

case class UberRecord(tripId: String, timestamp: String, point: Point)

Now we can read the dataset into a dataframe and cache the resulting dataframe.

val uber = sc.textFile(${uber.path}).map { line =>
val parts = line.split("t" )
val tripId = parts(0)
val timestamp = parts(1)
val point = Point(parts(3).toDouble, parts(2).toDouble)
UberRecord(tripId, timestamp, point)
}.
repartition(100).
toDF().
cache()

This dataset contains the trip id, the timestamp and the latitude and longitude of each point on the trip coalesced into a Point data structure.

A Point is the simplest geometric data structure available in Magellan. It represents a two dimensional point, with x and y coordinates. In this case, as is standard in geospatial analysis, the x coordinate refers to the longitude and the y coordinate the latitude.

Since this dataset is not interesting in itself, we need to enrich this dataset by determining which neighborhood each of these points lie in.

To do so, we will convert the neighborhood dataset into a dataframe as well, assuming the dataset has been downloaded and the path to the dataset is neighborhoods.path.

This dataset is in what is known as the ESRI Shapefile format.

This is one of the most common formats in which geospatial data is stored. Magellan has a Data Source implementation that understands how to parse ESRI Shapefiles into Shapes and Metadata.

val magellanContext = new MagellanContext(sc)
val neighborhoods = magellanContext.read.format("magellan").
load(${neighborhoods.path}).
select($"polygon", $"metadata").
cache()

There are two columns in this DataFrame: a shape representing the neighborhood which happens to be polygonal, and metadata which is a map of String keys and String values.

Magellan has a Polygon data structure to capture the spatial geometry of a Polygon. A Polygon in Magellan stands for a Polygonal object with zero or more holes.
Map columns can be exploded into their keys and values to yield the following dataframe:

neighborhoods.select(explode($"metadata").as(Seq("k", "v"))).show(5)

 

+----------+--------------------+
|         k|                   v|
+----------+--------------------+
|neighborho|Twin Peaks       ...|
|neighborho|Pacific Heights  ...|
|neighborho|Visitacion Valley...|
|neighborho|Potrero Hill     ...|
|neighborho|Crocker Amazon   ...|
+----------+--------------------+

Now we are getting somewhere: we are able to parse the San Francisco neighborhood dataset, extract its metadata as well as the polygon shapes that represent each neighborhood. The natural next step is to join this dataset with the uber dataset so that each point on the uber trip can be associated with its corresponding neighborhood.  

Here we run into an important spatial query: How do we compute whether a given point (uber location) lies within a given polygon (or neighborhood) ?

Magellan implements th as well as other spatial operators like intersects, intersection, contains, covers etc making it easy to use.
In Magellan, to join the Uber dataset with the San Francisco neighborhood dataset, you would issue the following Spark SQL query:

neighborhoods.
join(uber).
where($"point" within $"polygon").
select($"tripId", $"timestamp", explode($"metadata").as(Seq("k", "v"))).
withColumnRenamed("v", "neighborhood").
drop("k").
show(5)
+------+---------+------------+
|tripId|timestamp|neighborhood|
+------+---------+------------+
+------+---------+------------+

This is interesting: According to our calculation, the GPS coordinates representing the Uber dataset do not fall in any of the San Francisco neighborhoods. How can this be?

This is a good point to pause and think about coordinate systems. We have been using GPS coordinates for the Uber dataset, but haven’t verified the coordinate system that the San Francisco neighborhood dataset has been encoded in.

It turns out that most datasets published by the US governmental agencies use what is called State Plane coordinates.

Magellan supports translating between different coordinate systems by implementing a transformer interface which takes in Points and outputs Points.

This covers all conformal transformations which is the set of all transformations that preserve angles.
In particular, to translate between WGS84, the GPS standard coordinate system used in the Uber dataset, and NAD83 Zone 403 (state plane), we can use the following in built transformer:

val transformer: Point => Point = (point: Point) => {
val from = new NAD83(Map("zone" -> 403)).from()
val p = point.transform(from)
new Point(3.28084 * p.x, 3.28084 * p.y)
}

Here we have defined a new transformer that applies the NAD83 transformation for Zone 403 (Northern California) and further scales the points to have units in feet instead of meters.
This allows us to enhance the uber dataset by adding a new column, the scaled column representing the coordinates in the NAD83 State Plane Coordinate System:

val uberTransformed = uber.
withColumn("nad83", $"point".transform(transformer)).
cache()

Now we are ready to perform the join again:

val joined = neighborhoods.
join(uberTransformed).
where($"nad83" within $"polygon").
select($"tripId", $"timestamp", explode($"metadata").as(Seq("k", "v"))).
withColumnRenamed("v", "neighborhood").
drop("k").
cache()
joined.show(5)
 
+------+--------------------+--------------------+
|tripId|           timestamp|        neighborhood|
+------+--------------------+--------------------+
| 00002|2007-01-06T06:23:...|Marina           ...|
| 00006|2007-01-04T01:04:...|Marina           ...|
| 00008|2007-01-03T00:59:...|Castro/Upper Mark...|
| 00011|2007-01-06T09:08:...|Russian Hill     ...|
| 00014|2007-01-02T05:18:...|Mission          ...|
+------+--------------------+--------------------+

Ok, this looks much more reasonable!
One interesting question we are now ready to ask is: What are the top few neighborhoods where most Uber trips pass through?

joined.
groupBy($"neighborhood").
agg(countDistinct("tripId").
as("trips")).
orderBy(col("trips").desc).
show(5)
+--------------------+-----+
|        neighborhood|trips|
+--------------------+-----+
|South of Market  ...| 9891|
|Western Addition ...| 6794|
|Downtown/Civic Ce...| 6697|
|Financial Distric...| 6038|
|Mission          ...| 5620|
+--------------------+-----+

There are about 24664 trips for which we have neighborhood information, out of which close to 40% of the trips involve SOMA. Now if you are an Uber driver, you may just want to hang out around SOMA.
Breaking down this analysis by the neighborhood where trips originate reveals similar interesting insights.

+--------------------+-----+
|  start_neighborhood|count|
+--------------------+-----+
|South of Market  ...| 5697|
|Financial Distric...| 3542|
|Downtown/Civic Ce...| 3258|
|Western Addition ...| 2632|
|Mission          ...| 2332|
|Marina           ...| 1003|
|Nob Hill         ...|  960|
|Pacific Heights  ...|  853|
|Castro/Upper Mark...|  749|
|Russian Hill     ...|  512|
+--------------------+-----+

Out of 24664 trips, 5697 originate in SOMA. That is, 23% of all the Uber trips start in SOMA.
Another interesting question to ask is, what fraction of the Uber trips that originate in SOMA end up in SOMA.

+--------------------+-----+
|    end_neighborhood|count|
+--------------------+-----+
|South of Market  ...| 2259|
|Mission          ...|  911|
|Financial Distric...|  651|
|Downtown/Civic Ce...|  396|
|Western Addition ...|  380|
|Castro/Upper Mark...|  252|
|Potrero Hill     ...|  247|
|Nob Hill         ...|  101|
|Pacific Heights  ...|   56|
|Marina           ...|   51|
|Bernal Heights   ...|   48|
|North Beach      ...|   47|
|Haight Ashbury   ...|   45|
|Russian Hill     ...|   38|
|Chinatown        ...|   35|
|Noe Valley       ...|   32|
|Bayview          ...|   29|
|Treasure Island/Y...|   24|
|Golden Gate Park ...|   15|
|Inner Richmond   ...|   14|
+--------------------+-----+

That is, nearly 39% of all the trips that originate from SOMA end up in SOMA: as far as Uber is concerned, what happens in SOMA stays in SOMA!

As we see, once we add geospatial context to the Uber dataset, we end up with a fascinating array of questions we can ask about the nature of Uber trips in the city of San Francisco.

Summary

In this blog post, we have shown how to use Magellan to perform geospatial analysis on Spark.

Hopefully this short introduction has demonstrated how easy and elegant it is to incorporate geospatial context in your applications using Magellan.

In the next blog post, we will go under the hood to examine how Magellan leverages Spark SQL, Data Frames and Catalyst to provide elegant and simple user APIs while ensuring that spatial queries can execute efficiently.

Ram Sriharsha
More by this author

Leave a comment

Your email address will not be published. Links are not permitted in comments.