JSONize Anything in Pig with ToJson

The need for a ToJson EvalFunc

When integrating Pig with different NoSQL ‘databases,’ or when publishing data from Hadoop, it can be convenient to JSONize your data. Although Pig has JsonStorage, there hasn’t been a ToJson EvalFunc. This has been inconvenient, as in our post about Pig and ElasticSearch, such that for creating JSON for ElasticSearch to index, tricks like this were necessary:

1
2
3
4
5
6
store enron_emails into '/tmp/enron_emails_elastic' using JsonStorage();
json_emails = load '/tmp/enron_emails_elastic' AS (json_record:chararray);
 
/* Now we can store our email json data to elasticsearch for indexing with message_id. */
store json_emails into 'es://enron/email?json=true&size=1000' USING
  com.infochimps.elasticsearch.pig.ElasticSearchStorage('/me/elasticsearch-0.18.6/config/elasticsearch.yml', '/me/elasticsearch-0.18.6/plugins')

Note how we store as JSON via JsonStorage, then load as a chararray to get the entire record as JSON. It would be more convenient to convert Pig bags and tuples to JSON directly. This would let us retain an ID field as key, and only JSONize our record for that key as a string.

ToJson to the Rescue!

That is precisely what the ToJson method of pig-to-json does. It takes a bag or tuple or nested combination thereof and returns a JSON string.

Our data looks like so:

1
2
3
4
5
6
7
8
9
emails: {message_id: chararray,
         date: chararray,
         from: (address: chararray,name: chararray),
         subject: chararray,
         body: chararray,
         tos: {ARRAY_ELEM: (address: chararray,name: chararray)},
         ccs: {ARRAY_ELEM: (address: chararray,name: chararray)},
         bccs: {ARRAY_ELEM: (address: chararray,name: chararray)}
}

We can JSONify bags:

1
2
3
4
emails = load '/me/Data/enron.avro' using AvroStorage();
emails = limit emails 10;
json_test = foreach emails generate message_id, com.hortonworks.pig.udf.ToJson(tos) as bag_json;
dump json_test

Which gets us JSON arrays of JSON objects:

1
2
3
4
5
6
(<589.1075842593084.JavaMail.evans@thyme>,[{"address":"gerald.nemec@enron.com","name":null}])
(<614.1075847580822.JavaMail.evans@thyme>,[{"address":"jfichera@saberpartners.com","name":null},{"address":"barry.goode@gov.ca.gov","name":null},{"address":"hoffman@blackstone.com","name":null},{"address":"joseph.fichera@gov.ca.gov","name":null}])
(<735.1075840186524.JavaMail.evans@thyme>,[{"address":"kam.keiser@enron.com","name":"Kam Keiser"},{"address":"mike.grigsby@enron.com","name":"Mike Grigsby"}])
(<758.1075842602845.JavaMail.evans@thyme>,[{"address":"joan.quick@enron.com","name":null}])
(<765.1075860359973.JavaMail.evans@thyme>,[{"address":"jay_dudley@pgn.com","name":null},{"address":"michele_farrell@pgn.com","name":null}])
...

We can JSONify tuples:

1
2
3
4
emails2 = load '/me/Data/enron.avro' using AvroStorage();
emails2 = limit emails2 10;
json_test2 = foreach emails2 generate message_id, com.hortonworks.pig.udf.ToJson(from) as tuple_json;
dump json_test2

Which gets us JSON objects:

1
2
3
4
5
(<28.1075842613917.JavaMail.evans@thyme>,{"address":"emmye@dracospring.com","name":"\"Emmye\""})
(<85.1075854368299.JavaMail.evans@thyme>,{"address":"darron.giron@enron.com","name":null})
(<167.1075851646300.JavaMail.evans@thyme>,{"address":"jeff.dasovich@enron.com","name":"Jeff Dasovich"})
(<185.1075857304356.JavaMail.evans@thyme>,{"address":"chris.dorland@enron.com","name":"Chris Dorland"})
(<735.1075840186524.JavaMail.evans@thyme>,{"address":"m..love@enron.com","name":"Phillip M. Love"})

And we can JSONify more complex, nested structures:

1
2
3
4
5
-- This works for arbitrarily complex data structures as well
a = foreach (group emails by from.address) generate group as from_address, COUNT_STAR(emails) as sent_count, FLATTEN(ema.tos) as tos;  
b = group a by from_address;
c = foreach b generate group as from_address, com.hortonworks.pig.udf.ToJson(a) as json_test;
store c into '/tmp/big_test_num';

Which gets us an array of JSON objects where one field refers to an array of other objects, and one field stores a number:

1
2
3
4
5
6
bc@ori.org	[{"tos":[{"address":"klay@enron.com","name":null}],"sent_count":1,"from_address":"bc@ori.org"}]
ben@crs.hn	[{"tos":[{"address":"klay@enron.com","name":null}],"sent_count":1,"from_address":"ben@crs.hn"}]
cba@uh.edu	[{"tos":[{"address":"rod.hayslett@enron.com","name":null}],"sent_count":1,"from_address":"cba@uh.edu"}]
cei@uh.edu	[{"tos":[{"address":"ceiinfo@uh.edu","name":"Shena Cherian"}],"sent_count":1,"from_address":"cei@uh.edu"}]
nc@mmf.com	[{"tos":[{"address":"kenneth.lay@enron.com","name":null}],"sent_count":1,"from_address":"nc@mmf.com"}]
nyb@uni.de	[{"tos":[{"address":"extramoney@mailman.enron.com","name":null}],"sent_count":1,"from_address":"nyb@uni.de"}]

We are JSONizing fields like crazy. Its madness! Woohoo!

You can download the project on github here: https://github.com/rjurney/pig-to-json. To build it, you can:

1
ant clean; ant dist

ToJson Internals

Setting up a Pig EvalFunc project is easy. I copied my ivy.xml from some place, to import dependencies from maven:

1
2
3
4
5
6
7
8
9
<ivy-module version="2.0">
  <info organisation="org.apache" module="hello-ivy"/>
  <dependencies>
    <dependency org="com.googlecode.json-simple" name="json-simple" rev="1.1"/>
    <dependency org="org.apache.pig" name="pig" rev="0.8.0"/>
    <dependency org="org.apache.hadoop" name="hadoop-core" rev="0.20.2"/>
    <dependency org="commons-lang" name="commons-lang" rev="2.5"/>
  </dependencies>
</ivy-module>

I also copied my build.xml from some place too, and edited a few values to make it work with my class. The meat of the UDF is in ToJson.java, which I copied from some place else and edited. There’s an interesting bit here:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    public Schema outputSchema(Schema inputSchema) {
 
        // We must strip the outer {} or this schema won't parse on the back-end
        // This is a trick until PIG-2910 is fixed. Thanks to Thejas for this!
        String schemaString = inputSchema.toString().substring(1, inputSchema.toString().length() - 1);
 
        // Set the input schema for processing
        UDFContext context = UDFContext.getUDFContext();
        Properties udfProp = context.getUDFProperties(this.getClass());
 
        udfProp.setProperty("horton.json.udf.schema", schemaString);
 
        // Construct our output schema which is one field, that is a chararray
        return new Schema(new FieldSchema(null, DataType.CHARARRAY));
    }

We use EvalFunc.outputSchema to pass our input’s schema from Pig’s front end to its distributed back-end using the UDFContext’s properties. Our output schema is always a chararray, as this is ToJson after all. We retrieve the input schema in our Evalfunc.exec method:

1
2
3
4
5
6
7
schema = Utils.getSchemaFromString(strSchema);
...
// Parse the schema from the string stored in the properties object.
Object field = input.get(0);
Object jsonObject = fieldToJson(field, schema.getFields().get(0));
String json = jsonObject.toString();
return json;

Pig UDFs are pretty easy to write, as you can see! Be lazy. Find an apache licensed pig UDF on github or in Piggybank and make it your own!

Categorized by :
Pig

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *

If you have specific technical questions, please post them in the Forums

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

Modern Data Architecture
Tackle the challenges of big data. Hadoop integrates with existing EDW, RDBMS and MPP systems to deliver lower cost, higher capacity infrastructure.
Integrate with existing systems
Hortonworks maintains and works with an extensive partner ecosystem from broad enterprise platform vendors to specialized solutions and systems integrators.
Contact Us
Hortonworks provides enterprise-grade support, services and training. Discuss how to leverage Hadoop in your business with our sales team.