I was recently working on a demo for a new client who had a decent amount of data that they wanted processed to generate reports. Since I’ve been playing around with Spark lately for processing data, I decided I’d use it for this demo. I wanted to use MongoDB as my data store as I love how easy it is to get things done with it.
However, I couldn’t find an easy way to read the data from MongoDB and use it in my Spark code. I wanted something that felt natural in the Spark/Scala world. I decided to create my own RDD for MongoDB, and thus, MongoRDD was born.
You can find it here: https://github.com/caffinc/MongoRDD
How do I use it?
MongoRDD extends the Spark RDD class and provides a way to read from MongoDB directly into Spark.
Usage in Scala:
val sc = new SparkContext(conf) val mongoClientUri = "mongodb://localhost:27017" val database = "DBName" val collection = "CollectionName" val query = new Document(...) val partitions = 4 new MongoRDD(sc, mongoClientUri, database, collection, query, partitions).map(...)
Usage in Java (Assume constants are the same):
new JavaRDD<>( new MongoRDD(sc, mongoClientUri, database, collection, query, partitions), ClassManifestFactory$.MODULE$.fromClass(Document.class) ).map(...)
MongoClientURI is one of the simplest ways to connect to a MongoDB instance. Feel free to extend this, and raise a Pull Request if you think it should be included in this repo.
These are not absolute, but are current (probably) as of 3rd March, 2016. I tried to keep everything as current as possible so people should be able to track down all dependencies easily. It should be trivial to upgrade or downgrade versions as required.
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>1.6.0</version> </dependency> <dependency> <groupId>org.mongodb</groupId> <artifactId>mongo-java-driver</artifactId> <version>3.2.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.18</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>de.flapdoodle.embed</groupId> <artifactId>de.flapdoodle.embed.mongo</artifactId> <version>1.50.2</version> <scope>test</scope> </dependency> </dependencies>
There is just one extensive test, which launches an embedded MongoDB instance and writes dummy values into it and tests the MongoRDD on a local Spark instance. The test Works on my Machine™ and Travis-CI (Which is awesome!).
It might not work on your machine for the following reasons:
- It uses an Embedded MongoDB instance, which requires several megabytes of download the first time it runs. This might be slow, and the test might timeout. Comment out the line which makes the
setUp()fail on slow starts and try it out.
- You might have an older version of Spark in your dependencies which might have a bug while running on Windows. Are you able to run Spark for other stuff without issues?
- You’re channeling evil spirits which don’t like MongoDB. Pray to your God and hope for the best, or send me an email (email@example.com) if you think I can help :)
There are a few things that can be done to extend this:
- Provide other means of connecting to MongoDB
- Make the RDD generic, and provide an interface to convert BSON documents to other formats before returning (This can be achieved with a simple call to
map()so it wasn’t done)
- Make it available in Maven Central or Bintray
- Add more tests
If you can help with one or more of the above, or if you have suggestions of your own, send me an email or raise a PR and I will review it and add it.
If you face any issues trying to get this to work for you, shoot me an email: firstname.lastname@example.org.