Monthly Archives: April 2017
Read a MongoDB Collection from Spark: an example
I was recently presented with the task of reading an existing MongoDb collection from Spark, querying its content via Spark-SQL and export it as a csv. I have ended up using Spark-MongoDB library, which allows reading and writing data with Spark-SQL from or into MongoDB. The collection in question is the one offered for free by MongDB here and the jars used in my pom.xml are as follows:
[code language=”xml”]</pre>
<pre><?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>mongodb-spark</groupId>
<artifactId>mongodb-spark</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.stratio.datasource</groupId>
<artifactId>spark-mongodb_2.10</artifactId>
<version>0.11.2</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-commons_2.10</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.10</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-query_2.10</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.4.0</version>
</dependency>
</dependencies>
</project></pre>
<pre>[/code]
Here is my Java class:
[code language=”java”]
</pre>
<pre>import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import java.util.HashMap;
import java.util.Map;
public class MongoDBSparkIntegration {
public static void main(String[] args) {
String outputPath = "D:\\Dev\\MongoDb-Spark-Integration\\src\\main\\resources\\output";
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("MongoDB-Spark-Application");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
Map<String, String> options = new HashMap<String, String>();
options.put("host", "localhost:27017");
options.put("database", "mydb");
options.put("collection", "test");
DataFrame df = sqlContext.read().format("com.stratio.datasource.mongodb").options(options).load();
df.registerTempTable("tmp");
sqlContext.sql("SELECT * FROM tmp").show();
df.write().format("com.databricks.spark.csv").save(outputPath);
sqlContext.dropTempTable("tmp");
}
}</pre>
<pre>
[/code]
This is the printout returned from the IntelliJ console:
You must be logged in to post a comment.