You can use td-pyspark to bridge the results of data manipulations in Amazon EMR (Elastic MapReduce) with your data in Arm Treasure Data.
Amazon EMR is an AWS tool for big data processing and analysis, providing an easy-to-use interface for accessing Spark. PySpark is a Python API for Spark. Treasure Data's td-pyspark is a Python library that provides a handy way to use PySpark and Treasure Data based on td-spark.
To follow the steps in this example, you must have the following Treasure Data items:
- Treasure Data API key
- td-spark feature enabled
Configuring your Amazon EMR Environment
You create a key pair, cluster, install td-pyspark libraries and configure a notebook for your connection code.
Log into Amazon Management Service. Under Find Services, enter EMR. Amazon EMR cluster nodes run on Amazon EC2 instances.
Create an EC2 Key Pair
When you create the key pair in Amazon, you provide a name and a file with the extension of .pem is generated. You download the generated file to your local computer.
For more information about creating an Amazon EC2 key pair, see Amazon EC2 Key Pairs.
You refer to the key when you create a cluster, You specify the Amazon EC2 key pair that is used for SSH connections to all cluster instances.
Create a Cluster on Amazon EMR
Complete the configuration fields. Provide a cluster name, a folder location for the cluster data and select version Spark 2.4.3 or later as the Application.
Specify the instance and key pair.
Connect to EMR Master node with SSH
To grant inbound access permission from your local computer, you specify a secure protocol.
Select the security group for Master:
In the Amazon, find the Master node that you want to access:
With the proxy established, you log onto the Master node instance. On your local computer, access the AWS Master node instance through an ssh protocol.
$ ssh -i ~/ <your_aws_keypair_kry.pem> hadoop@ <Master public DNS>
You see a connection confirmation on your local computer.
Install the td-pyspark Libraries
Access the Treasure Data Apache Spark Driver Release Notes. From the article, you click on links to download code.
Inside your EMR instance, click to download the library to your Master node.
Still within the Master node instance, run the following command to install pyspark:
$ pip install td_pyspark
Create a Configuration File and Specify your TD API Key and Site
In the the Master node instance, create a td-spark.conf file. In the configuration file, specify your TD API Key, TD site parameters and spark environment.
An example of the format is as follows. You provide the actual values:
spark.td.apikey (Your TD API KEY)
spark.td.site (Your site: us, jp, eu01)
Launch PySpark. In your command, include the arguments as shown in the following example:
% pyspark --driver-class-path ./td_pyspark-19.7.0/td_pyspark/jars/td-spark-assembly.jar --properties-file ./td-spark.conf
You see something similar to the following:
Then load td_pyspark, as follows. Note the prompt symbol changes to >>>:
>>> import td_pyspark
>>> from pyspark import SparkContext
>>> from pyspark.sql import SparkSession >>> builder = SparkSession.builder.appName("td-pyspark-test" >>> td = td_pyspark.TDSparkContextBuilder(builder).build() >>> df = td.table("sample_datasets.www_access").within("+2d/2014-10-04").df() >>> df.show()
TDSparkContextBuilder is an entry point to access td_pyspark's functionalities. As shown in the preceding code sample, you read tables in Treasure Data as data frames:
df = td.table("tablename").df()
You see a result similar to the following:
Your connection is working.
Interacting with Treasure Data in your Master Node Instance
You can run select and insert queries to Treasure Data or query back data from Treasure Data. You can also create and delete databases and tables.
You can use the following commands:
Read Tables as DataFrames
To read a table, use
df = td.table("sample_datasets.www_access").df() df.show()
Change the Database used in Treasure Data
To change the context database, use
td.use("sample_datasets") # Accesses sample_datasets.www_access df = td.table("www_access").df()
.df() your table data is read as Spark's DataFrame. The usage of the DataFrame is the same with PySpark. See also PySpark DataFrame documentation.
df = td.table("www_access").df()
Submit presto queries
If your Spark cluster is small, reading all of the data as in-memory DataFrame might be difficult. In this case, you can use Presto, a distributed SQL query engine, to reduce the amount of data processing with PySpark.
q = td.presto("select code, * from sample_datasets.www_access") q.show()
q = td.presto("select code, count(*) from sample_datasets.www_access group by 1")
Create or drop a database
Upload DataFrames to Treasure Data
To save your local DataFrames as a table, you have two options:
- Insert the records in the input DataFrame to the target table
- Create or replace the target table with the content of the input DataFrame
Checking Amazon EMR in Treasure Data
You can use td toolbelt to check your database from a command line. Alternatively, if you have TD Console, you can check your databases and queries. Read about Database and Table Management.