Monthly Archives: April 2017

Read a MongoDB Collection from Spark: an example

I was recently presented with the task of reading an existing MongoDb collection from Spark, querying its content via Spark-SQL and export it as a csv. I have ended up using Spark-MongoDB library, which allows reading and writing data with Spark-SQL from or into MongoDB. The collection in question is the one offered for free by MongDB here and the jars used in my pom.xml are as follows:

 

[code language=”xml”]</pre>
<pre>&lt;?xml version="1.0" encoding="UTF-8"?&gt;
&lt;project xmlns="http://maven.apache.org/POM/4.0.0&quot;
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance&quot;
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"&gt;
&lt;modelVersion&gt;4.0.0&lt;/modelVersion&gt;

&lt;groupId&gt;mongodb-spark&lt;/groupId&gt;
&lt;artifactId&gt;mongodb-spark&lt;/artifactId&gt;
&lt;version&gt;1.0-SNAPSHOT&lt;/version&gt;
&lt;dependencies&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.apache.spark&lt;/groupId&gt;
&lt;artifactId&gt;spark-core_2.10&lt;/artifactId&gt;
&lt;version&gt;1.6.1&lt;/version&gt;
&lt;scope&gt;compile&lt;/scope&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;junit&lt;/groupId&gt;
&lt;artifactId&gt;junit&lt;/artifactId&gt;
&lt;version&gt;4.12&lt;/version&gt;
&lt;scope&gt;test&lt;/scope&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;com.stratio.datasource&lt;/groupId&gt;
&lt;artifactId&gt;spark-mongodb_2.10&lt;/artifactId&gt;
&lt;version&gt;0.11.2&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.mongodb&lt;/groupId&gt;
&lt;artifactId&gt;mongodb-driver&lt;/artifactId&gt;
&lt;version&gt;3.2.2&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.mongodb&lt;/groupId&gt;
&lt;artifactId&gt;mongo-java-driver&lt;/artifactId&gt;
&lt;version&gt;3.2.2&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.apache.spark&lt;/groupId&gt;
&lt;artifactId&gt;spark-sql_2.10&lt;/artifactId&gt;
&lt;version&gt;1.6.1&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.mongodb&lt;/groupId&gt;
&lt;artifactId&gt;casbah-commons_2.10&lt;/artifactId&gt;
&lt;version&gt;3.1.1&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.mongodb&lt;/groupId&gt;
&lt;artifactId&gt;casbah-core_2.10&lt;/artifactId&gt;
&lt;version&gt;3.1.1&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.mongodb&lt;/groupId&gt;
&lt;artifactId&gt;casbah-query_2.10&lt;/artifactId&gt;
&lt;version&gt;3.1.1&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;com.databricks&lt;/groupId&gt;
&lt;artifactId&gt;spark-csv_2.10&lt;/artifactId&gt;
&lt;version&gt;1.4.0&lt;/version&gt;
&lt;/dependency&gt;
&lt;/dependencies&gt;
&lt;/project&gt;</pre>
<pre>[/code]

Here is my Java class:

[code language=”java”]
</pre>
<pre>import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

import java.util.HashMap;
import java.util.Map;

public class MongoDBSparkIntegration {

public static void main(String[] args) {

String outputPath = "D:\\Dev\\MongoDb-Spark-Integration\\src\\main\\resources\\output";

SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("MongoDB-Spark-Application");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
Map&lt;String, String&gt; options = new HashMap&lt;String, String&gt;();
options.put("host", "localhost:27017");
options.put("database", "mydb");
options.put("collection", "test");

DataFrame df = sqlContext.read().format("com.stratio.datasource.mongodb").options(options).load();
df.registerTempTable("tmp");
sqlContext.sql("SELECT * FROM tmp").show();
df.write().format("com.databricks.spark.csv").save(outputPath);
sqlContext.dropTempTable("tmp");
}
}</pre>
<pre>
[/code]

This is the printout returned from the IntelliJ console:

Capture