Treasure Workflow provides an easy way to modify data with PySpark. You can port existing PySpark code to your Custom Scripting environment in Arm Treasure Data.
PySpark is a Python API for Spark. You can run PySpark as part of your scheduled workflows, using Python Custom scripts. Apache Spark is widely used for distributed computation for Machine Learning and general data manipulation.
The following example shows how to complete basic data manipulations using stand-alone mode PySpark with Python Custom scripts.
Data Manipulation using Python Custom Scripts and PySpark
PySpark is a Python interface, and a unified analytics engine for large-scale data processing. This example shows how to load data from Treasure Data, process it on PySpark, execute a SparkSQL, and upload modified data back to Treasure Data.
Example Workflow to Process Data with td-spark
The workflow that you create in this example completes the following actions:
- Fetch data from Treasure Data
- Process data as Spark DataFrame
- Manipulate data with SparkSQL
- Upload Spark DataFrame to Treasure Data
- Upload pandas DataFrame to Treasure Data
To follow the steps in this example, you must have a basic knowledge of Treasure Workflow syntax and set up your environment as follows:
- Download and install the TD Toolbelt and the TD Toolbelt Workflow module. See TD Workflow Quickstart
- Ensure that the custom scripts feature is enabled for your TD account.
Run the Example Workflow
You retrieve code from Treasure Data GitHub to run in your environment.
- Download the project from the GitHub repository
- In your Terminal window, change the directory to
- Run the example workflow as follows:
$ td workflow push td-spark
The workflow fetches www_access data in the sample_datasets database, then filters the data by HTTP “GET” method requests and adds a time column in string format. The manipulated data is stored in the
www_access_processed table in the td_spark_example database.
The processed data with PySpark filtered only with GET method is as follows:
Additionally, you can find the pandas dataframe in the pandas_table as follows:
Review the Workflow Custom Python Script
Review the contents of the directory and note the following files:
Example workflow for data manipulation with PySpark.
The workflow YAML file pyspark.dig file invokes the Python script for manipulating data with PySpark. It has three main tasks:
- The td_spark_process task creates the
www_access_processedtable in the td_spark_example database by using Spark DataFrame execution.
- The td_spark_sql executes SparkSQL.
- The td_spark_upload task uploads the processed pandas data frames to the TD database.
Custom Python script with PySpark and td-pyspark.
The Python script td-spark.py houses all the python functions invoked by the pyspark.dig file. Three main functions illustrate processing data with PySpark:
- The process_data function processes data with PySpark filtered only with GET method of HTTP requests and adds a time column converted into string format.
- The execute_sql function demonstrates how you can use SparkSQL to process data
- The upload_dataframe function uses the pandas dataframe to upload data to Treasure Data after converting the data frame into a Spark dataframe.