Read a MongoDB Collection from Spark: an example

I was recently presented with the task of reading an existing MongoDb collection from Spark, querying its content via Spark-SQL and export it as a csv. I have ended up using Spark-MongoDB library, which allows reading and writing data with Spark-SQL from or into MongoDB. The collection in question is the one offered for free by MongDB here and the jars used in my pom.xml are as follows:

 

</pre>
<pre>&lt;?xml version="1.0" encoding="UTF-8"?&gt;
&lt;project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"&gt;
    &lt;modelVersion&gt;4.0.0&lt;/modelVersion&gt;

    &lt;groupId&gt;mongodb-spark&lt;/groupId&gt;
    &lt;artifactId&gt;mongodb-spark&lt;/artifactId&gt;
    &lt;version&gt;1.0-SNAPSHOT&lt;/version&gt;
    &lt;dependencies&gt;
        &lt;dependency&gt;
            &lt;groupId&gt;org.apache.spark&lt;/groupId&gt;
            &lt;artifactId&gt;spark-core_2.10&lt;/artifactId&gt;
            &lt;version&gt;1.6.1&lt;/version&gt;
            &lt;scope&gt;compile&lt;/scope&gt;
        &lt;/dependency&gt;
        &lt;dependency&gt;
            &lt;groupId&gt;junit&lt;/groupId&gt;
            &lt;artifactId&gt;junit&lt;/artifactId&gt;
            &lt;version&gt;4.12&lt;/version&gt;
            &lt;scope&gt;test&lt;/scope&gt;
        &lt;/dependency&gt;
        &lt;dependency&gt;
            &lt;groupId&gt;com.stratio.datasource&lt;/groupId&gt;
            &lt;artifactId&gt;spark-mongodb_2.10&lt;/artifactId&gt;
            &lt;version&gt;0.11.2&lt;/version&gt;
        &lt;/dependency&gt;
        &lt;dependency&gt;
            &lt;groupId&gt;org.mongodb&lt;/groupId&gt;
            &lt;artifactId&gt;mongodb-driver&lt;/artifactId&gt;
            &lt;version&gt;3.2.2&lt;/version&gt;
        &lt;/dependency&gt;
        &lt;dependency&gt;
            &lt;groupId&gt;org.mongodb&lt;/groupId&gt;
            &lt;artifactId&gt;mongo-java-driver&lt;/artifactId&gt;
            &lt;version&gt;3.2.2&lt;/version&gt;
        &lt;/dependency&gt;
        &lt;dependency&gt;
            &lt;groupId&gt;org.apache.spark&lt;/groupId&gt;
            &lt;artifactId&gt;spark-sql_2.10&lt;/artifactId&gt;
            &lt;version&gt;1.6.1&lt;/version&gt;
        &lt;/dependency&gt;
        &lt;dependency&gt;
            &lt;groupId&gt;org.mongodb&lt;/groupId&gt;
            &lt;artifactId&gt;casbah-commons_2.10&lt;/artifactId&gt;
            &lt;version&gt;3.1.1&lt;/version&gt;
        &lt;/dependency&gt;
        &lt;dependency&gt;
            &lt;groupId&gt;org.mongodb&lt;/groupId&gt;
            &lt;artifactId&gt;casbah-core_2.10&lt;/artifactId&gt;
            &lt;version&gt;3.1.1&lt;/version&gt;
        &lt;/dependency&gt;
        &lt;dependency&gt;
            &lt;groupId&gt;org.mongodb&lt;/groupId&gt;
            &lt;artifactId&gt;casbah-query_2.10&lt;/artifactId&gt;
            &lt;version&gt;3.1.1&lt;/version&gt;
        &lt;/dependency&gt;
        &lt;dependency&gt;
            &lt;groupId&gt;com.databricks&lt;/groupId&gt;
            &lt;artifactId&gt;spark-csv_2.10&lt;/artifactId&gt;
            &lt;version&gt;1.4.0&lt;/version&gt;
        &lt;/dependency&gt;
    &lt;/dependencies&gt;
&lt;/project&gt;</pre>
<pre>

Here is my Java class:

</pre>
<pre>import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

import java.util.HashMap;
import java.util.Map;

public class MongoDBSparkIntegration {

    public static void main(String[] args) {

        String outputPath = "D:\\Dev\\MongoDb-Spark-Integration\\src\\main\\resources\\output";

        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("MongoDB-Spark-Application");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
        Map&lt;String, String&gt; options = new HashMap&lt;String, String&gt;();
        options.put("host", "localhost:27017");
        options.put("database", "mydb");
        options.put("collection", "test");

        DataFrame df = sqlContext.read().format("com.stratio.datasource.mongodb").options(options).load();
        df.registerTempTable("tmp");
        sqlContext.sql("SELECT * FROM tmp").show();
        df.write().format("com.databricks.spark.csv").save(outputPath);
        sqlContext.dropTempTable("tmp");
    }
}</pre>
<pre>

This is the printout returned from the IntelliJ console:

Capture

 

Advertisements

Posted on April 7, 2017, in Uncategorized. Bookmark the permalink. Leave a comment.

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: