Thursday, June 18, 2015

New – Apache Spark on Amazon EMR

My colleague Jon Fritz wrote the guest post below to introduce a powerful new feature for Amazon EMR.

Jeff;


I’m happy to announce that Amazon EMR now supports Apache Spark. Amazon EMR is a web service that makes it easy for you to process and analyze vast amounts of data using applications in the Hadoop ecosystem, including Hive, Pig, HBase, Presto, Impala, and others. We’re delighted to officially add Spark to this list. Although many customers have previously been installing Spark using custom scripts, you can now launch an Amazon EMR cluster with Spark directly from the Amazon EMR Console, CLI, or API.

Apache Spark: Beyond Hadoop MapReduce
We have seen great customer successes using Hadoop MapReduce for large scale data processing, batch reporting, ad hoc analysis on unstructured data, and machine learning. Apache Spark, a newer distributed processing framework in the Hadoop ecosystem, is also proving to be an enticing engine by increasing job performance and development velocity for certain workloads.

By using a directed acyclic graph (DAG) execution engine, Spark can create a more efficient query plan for data transformations. Also, Spark uses in-memory, fault-tolerant resilient distributed datasets (RDDs), keeping intermediates, inputs, and outputs in memory instead of on disk. These two elements of functionality can result in better performance for certain workloads when compared to Hadoop MapReduce, which will force jobs into a sequential map-reduce framework and incurs an I/O cost from writing intermediates out to disk. Spark’s performance enhancements are particularly applicable for iterative workloads, which are common in machine learning and low-latency querying use cases.

Additionally, Spark natively supports Scala, Python, and Java APIs, and it includes libraries for SQL, popular machine learning algorithms, graph processing, and stream processing. With many tightly integrated development options, it can be easier to create and maintain applications for Spark than to work with the various abstractions wrapped around the Hadoop MapReduce API.

Introducing Spark on Amazon EMR
Today, we are introducing support for Apache Spark in Amazon EMR. You can quickly and easily create scalable, managed Spark clusters on a variety of Amazon Elastic Compute Cloud (EC2) instance types from the Amazon EMR console, AWS Command Line Interface (CLI) or directly using the Amazon EMR API. As an engine running in the Amazon EMR container, Spark can take advantage of Amazon EMR FS (EMRFS) to directly access data in Amazon Simple Storage Service (S3), push logs to Amazon S3, utilize EC2 Spot capacity for lower costs, and can leverage Amazon EMR’s integration with AWS security features such as IAM roles, EC2 security groups, and S3 encryption at rest (server-side and client-side). Even better, there is no additional charge to run Spark in Amazon EMR.

Spark includes Spark SQL for low-latency, interactive SQL queries, MLlib for out-of-the-box scalable, distributed machine learning algorithms, Spark Streaming for building resilient stream processing applications, and GraphX for graph processing. You can also install Ganglia on your Amazon EMR clusters for additional Spark monitoring. You can send workloads to Spark by submitting Spark steps to the EMR Step API for batch jobs, or interacting directly with the Spark API or Spark Shell on the master node of your cluster for interactive workflows.

Example Customer Use Cases
Even before today’s launch, many customers have been working with Spark on Amazon EMR, using bootstrap actions for installation. Here are a few examples:

  • The Washington Post is using Spark to power a recommendation engine to show additional content to their readers.
  • Yelp, a consumer application that connects users with local businesses, leverages the machine learning libraries included in MLlib with Spark to increase the click-through rates of display advertisements.
  • Hearst Corporation uses Spark Streaming to quickly process clickstream data from over 200 web properties. This allows them to create a real-time view of article performance and trending topics.
  • Krux uses Spark in its Data Management Platform to process log data stored in Amazon S3 using EMRFS.

Analytics With Spark – A Quick Example
To show an example of how quickly you can start processing data using Spark on Amazon EMR, let’s ask a few questions about flight delays and cancellations for domestic flights in the US.

The Department of Transportation has a public data set outlining flight information since 1987. I downloaded it, converted the file format from CSV to the columnar Parquet format (for better performance), and uploaded it to a public, read-only S3 bucket (s3://us-east-1.elasticmapreduce.samples/flightdata/input). The data set is around 4 GB compressed (79 GB uncompressed) and contains 162,212,419 rows, so it makes sense to use a distributed framework like Spark for querying. Specifically, I would like to know the 10 airports with the most departures, the most flight delays over 15 minutes, the most flight delays over 60 minutes, the and most flight cancellations. I also want to know the number of flight cancellations by yearly quarter, and the 10 most popular flight routes.

I translated these questions into SQL queries, and wrote a Spark application in Scala to run these queries. You can download the Scala code from s3://us-east-1.elasticmapreduce.samples/flightdata/sparkapp/FlightExample.scala; I have an excerpt below:

val parquetFile = hiveContext.parquetFile("s3://us-east-1.elasticmapreduce.samples/flightdata/input/")

//Parquet files can also be registered as tables and then used in SQL statements. 
parquetFile.registerTempTable("flights")

//Top 10 airports with the most departures since 2000
val topDepartures = hiveContext.sql("SELECT origin, count(*) AS total_departures FROM flights WHERE year >= '2000' GROUP BY origin ORDER BY total_departures DESC LIMIT 10")
topDepartures.rdd.saveAsTextFile(s"$OutputLocation/top_departures")

Note that the application creates a table “flights” which is an in-memory RDD, and each SQL query uses that table to reduce the I/O cost for the query. Also, Spark in EMR uses EMRFS to directly access data in S3 without needing to copy it into HDFS first. Notice the input dataset and output location in S3. I compiled the application into a JAR, and you can download it at https://s3.amazonaws.com/us-east-1.elasticmapreduce.samples/flightdata/sparkapp/flightsample_2.10-1.3.jar.

Let’s launch a three node m3.xlarge Amazon EMR cluster with Spark to execute this application. Make sure you are launching your cluster in the US East region, because the sample dataset for this example is located in that region. Go to the Create Cluster page in the Amazon EMR console. Disable Termination Protection and Logging (you can find this in the Cluster Configuration section). Scroll down to the Software Configuration section to add Spark as an Additional Applications.

In the modal that appears after you click Configure and Add, add -x in the Arguments text box. This parameter will override the default configuration for Spark executors, which are the processes which carry out the actual execution of your application.

Spark’s default on AMI 3.8 is the Apache default, which is 2 executors each with 1 GB of RAM, so it makes sense to override these settings to utilize more resources on your cluster to run your Spark application. “-x” sets the number of executors to the number of core nodes at the creation of the cluster, and it sets the RAM and vcores allocated to each executor to the maximum that the selected core node type can support. The performance of your job will vary depending on your executor settings, so it’s advisable to test these settings to find the optimal configuration. Also, you can overwrite these settings at runtime by passing extra parameters to the actual spark-submit command.

Next, scroll to the Steps section near the bottom of the page to add two Steps. The first step will copy the Spark application JAR file from an S3 location to the master node of your cluster. Add a Custom Jar step, and in the modal, add s3://elasticmapreduce/libs/script-runner/script-runner.jar as the JAR location, and add /home/hadoop/bin/hdfs dfs -get s3://us-east-1.elasticmapreduce.samples/flightdata/sparkapp/flightsample_2.10-1.3.jar /mnt/ in the Arguments text box. Change the Action on failure to Terminate cluster, and click Add.

For the second step, add a Spark application step:

In the modal, select Client for Deploy Mode, which must submit applications from a local path (requiring the first step to copy your application from S3 to your cluster). For Application location, enter /mnt/flightsample_2.10-1.3.jar. For Arguments, add an S3 path to a bucket in your account where you would like Spark to write the output. And, change Action on failure to Terminate cluster. Then click Add.

Lastly, change Auto-terminate to Yes, so the cluster will automatically shut down once your Spark application is finished running. Then, click Create Cluster.

Amazon EMR will launch the cluster, run your Spark application, and terminate the cluster when the job is complete. You can monitor progress of your job from the Cluster Details page in the EMR console. Once the job is complete, you can view the output of the queries in the S3 bucket you indicated. I won’t ruin the surprise of the results, either – but definitely bring something to read if you’re flying from Chicago O’Hare.

Launch an Amazon EMR Cluster with Spark Today!
For more information about Spark on Amazon EMR, visit the Spark on Amazon EMR page.

Jon Fritz, Senior Product Manager

No comments:

Post a Comment