This article explains how to use the Apache Spark Driver for Arm Treasure Data (td-spark) on Amazon Elastic MapReduce (EMR). Although we recommend using the us-east region of Amazon EC2 for the optimal performance, it can also be used in other Spark environments as well.

Overview
- TD Spark Release Notes
- TD Spark FAQs
- General usage of td-spark
What does the driver enable today?
- Enable accessing Treasure Data from Spark in Scala and Python (PySpark).
- Pull Treasure Data tables into Spark as a DataFrame (No TD query is issued, providing the shortest latency path between TD stored data & your spark cluster possible).
- Issue Presto, Hive, or SparkSQL queries and return the result as a Spark DataFrame.
This driver is recommended for use with Spark 2.1.0 or higher
Recommendations regarding use
For fastest data access, and lowest data transfer costs, we recommend that you set-up your spark cluster in the AWS us-east region. Data Transfer costs may become quite high if using other AWS regions or processing environments.
TD Spark Driver on EMR
Create an EMR Spark cluster
-
Create an EMR cluster with Spark support. Using us-east region is highly recommended to maximize data transfer performance from S3.
-
Check the master node address of the new EMR
If you created EMR with default security group (ElasticMapReduce-master), make sure to permit inbound access from your environment. See “Amazon EMR-Managed Security Groups”.
Other references
- Create An EMR Cluster with Spark
- Building a Recommendation Engine with Spark ML on Amazon EMR using Zeppelin
Log-in to the EMR Cluster
Connect to EMR Master node with SSH
# Use 8157 for SOCKS5 proxy port so that you can access EMR Spark job history page (port 18080), Zeppelin note book (port 8890), etc. $ ssh -i (your AWS key pair file. .pem) -D8157 hadoop@ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com __| __|_ ) _| ( / Amazon Linux AMI ___|\___|___| https://aws.amazon.com/amazon-linux-ami/2016.09-release-notes/ 4 package(s) needed for security, out of 6 available Run "sudo yum update" to apply all updates. EEEEEEEEEEEEEEEEEEEE MMMMMMMM MMMMMMMM RRRRRRRRRRRRRRR E::::::::::::::::::E M:::::::M M:::::::M R::::::::::::::R EE:::::EEEEEEEEE:::E M::::::::M M::::::::M R:::::RRRRRR:::::R E::::E EEEEE M:::::::::M M:::::::::M RR::::R R::::R E::::E M::::::M:::M M:::M::::::M R:::R R::::R E:::::EEEEEEEEEE M:::::M M:::M M:::M M:::::M R:::RRRRRR:::::R E::::::::::::::E M:::::M M:::M:::M M:::::M R:::::::::::RR E:::::EEEEEEEEEE M:::::M M:::::M M:::::M R:::RRRRRR::::R E::::E M:::::M M:::M M:::::M R:::R R::::R E::::E EEEEE M:::::M MMM M:::::M R:::R R::::R EE:::::EEEEEEEE::::E M:::::M M:::::M R:::R R::::R E::::::::::::::::::E M:::::M M:::::M RR::::R R::::R EEEEEEEEEEEEEEEEEEEE MMMMMMM MMMMMMM RRRRRRR RRRRRR
Set Up TD Spark Integration
Download td-spark jar file:
[hadoop@ip-x-x-x-x]$ wget https://s3.amazonaws.com/td-spark/td-spark-assembly_2.11-0.4.0.jar
Create a td.conf file in the master node:
# Describe your TD API key here spark.td.apikey=(your TD API key) # (recommended) this use KryoSerializer for faster performance spark.serializer=org.apache.spark.serializer.KryoSerializer
Using spark-shell on EMR
[hadoop@ip-x-x-x-x]$ spark-shell --master yarn --jars td-spark-assembly-latest.jar --properties-file td.conf Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.1.0 /_/ scala> import com.treasuredata.spark._ scala> val td = spark.td scala> val d = td.table("sample_datasets.www_access").df scala> d.show +----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+ |user| host| path| referer|code| agent|size|method| time| +----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+ |null|136.162.131.221| /category/health| /category/cameras| 200|Mozilla/5.0 (Wind...| 77| GET|1412373596| |null| 172.33.129.134| /category/toys| /item/office/4216| 200|Mozilla/5.0 (comp...| 115| GET|1412373585| |null| 220.192.77.135| /category/software| -| 200|Mozilla/5.0 (comp...| 116| GET|1412373574| +----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+ only showing top 3 rows
Using Zeppelin Notebook on EMR
Configure Zeppelin for td-spark
Create SSH Tunnel to EMR Cluster:
$ ssh -i (your AWS key pair file. .pem) -D8157 hadoop@ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com
- (For Chrome users) Install Proxy Switchy Sharp Chrome Extension
- Turn on proxy-switch for emr when accessing your EMR master
- Open
http://(your EMR master node public address):8890/
- Configure td-spark at Interpreters page
Access Dataset in TD as DataFrame
- Read table data as Spark DataFrame
Running Presto Queries
Checking Spark History Server
- Open
http://(your EMR master node public address):18080/
TD Spark driver use with PySpark & SparkSQL
PySpark
$ ./bin/pyspark --driver-class-path ~/work/git/td-spark/td-spark/target/td-spark-assembly-0.1-SNAPSHOT.jar --properties-file ../td-dev.conf >>> df = spark.read.format("com.treasuredata.spark").load("sample_datasets.www_access") >>> df.show(10) 2016-07-19 16:34:15-0700 info [TDRelation] Fetching www_access within time range:[-9223372036854775808,9223372036854775807) - (TDRelation.scala:82) 2016-07-19 16:34:16-0700 info [TDRelation] Retrieved 19 PlazmaAPI entries - (TDRelation.scala:85) +----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+ |user| host| path| referer|code| agent|size|method| time| +----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+ |null| 148.165.90.106|/category/electro...| /category/music| 200|Mozilla/4.0 (comp...| 66| GET|1412333993| |null| 144.105.77.66|/item/electronics...| /category/computers| 200|Mozilla/5.0 (iPad...| 135| GET|1412333977| |null| 108.54.178.116|/category/electro...| /category/software| 200|Mozilla/5.0 (Wind...| 69| GET|1412333961| |null|104.129.105.202|/item/electronics...| /item/games/394| 200|Mozilla/5.0 (comp...| 83| GET|1412333945| |null| 208.48.26.63| /item/software/706|/search/?c=Softwa...| 200|Mozilla/5.0 (comp...| 76| GET|1412333930| |null| 108.78.209.95|/item/giftcards/4879| /item/toys/197| 200|Mozilla/5.0 (Wind...| 137| GET|1412333914| |null| 108.198.97.206|/item/computers/4785| -| 200|Mozilla/5.0 (Wind...| 69| GET|1412333898| |null| 172.195.185.46| /category/games| /category/games| 200|Mozilla/5.0 (Maci...| 41| GET|1412333882| |null| 88.24.72.177|/item/giftcards/4410| -| 200|Mozilla/4.0 (comp...| 72| GET|1412333866| |null| 24.129.141.79|/category/electro...|/category/networking| 200|Mozilla/5.0 (comp...| 73| GET|1412333850| +----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+ only showing top 10 rows ## Submitting presto job >>> df = spark.read.format("com.treasuredata.spark").options(sql="select 1").load("sample_datasets") 2016-07-19 16:56:56-0700 info [TDSparkContext] Submitted job 515990: select 1 - (TDSparkContext.scala:70) >>> df.show(10) +-----+ |_col0| +-----+ | 1| +-----+ ## Reading job results >>> df = sqlContext.read.format("com.treasuredata.spark").load("job_id:515990")
SparkSQL
# Register DataFrame as a temporary table scala> td.df("hb_tiny.rankings").createOrReplaceTempView("rankings") scala> val q1 = spark.sql("select page_url, page_rank from rankings where page_rank > 100") q1: org.apache.spark.sql.DataFrame = [page_url: string, page_rank: bigint] scala> q1.show 2016-07-20 11:27:11-0700 info [TDRelation] Fetching rankings within time range:[-9223372036854775808,9223372036854775807) - (TDRelation.scala:82) 2016-07-20 11:27:12-0700 info [TDRelation] Retrieved 2 PlazmaAPI entries - (TDRelation.scala:85) +--------------------+---------+ | page_url|page_rank| +--------------------+---------+ |xjhmjsuqolfklbvxn...| 251| |seozvzwkcfgnfuzfd...| 165| |fdgvmwbrjlmvuoquy...| 132| |gqghyyardomubrfsv...| 108| |qtqntqkvqioouwfuj...| 278| |wrwgqnhxviqnaacnc...| 135| | cxdmunpixtrqnvglnt| 146| | ixgiosdefdnhrzqomnf| 126| |xybwfjcuhauxiopfi...| 112| |ecfuzdmqkvqktydvi...| 237| |dagtwwybivyiuxmkh...| 177| |emucailxlqlqazqru...| 134| |nzaxnvjaqxapdjnzb...| 119| | ffygkvsklpmup| 332| |hnapejzsgqrzxdswz...| 171| |rvbyrwhzgfqvzqkus...| 148| |knwlhzmcyolhaccqr...| 104| |nbizrgdziebsaecse...| 665| |jakofwkgdcxmaaqph...| 187| |kvhuvcjzcudugtidf...| 120| +--------------------+---------+ only showing top 20 rows
Comments
0 comments
Please sign in to leave a comment.