Problem
How can a single query export results to multiple destinations in a Treasure Data workflow?
Solution
You can achieve multiple exports from a single query by separating the query execution from the result export steps using the td_result_export>:
operator. This approach ensures that the query is executed only once while allowing the results to be exported to multiple destinations.
Key Components of the Solution
- Execute the query once using a standard
td>:
task - Export results multiple times in parallel using multiple
td_result_export>:
tasks - Use parallel execution to ensure all export tasks reference the same job ID
How It Works
- The initial
td>:
task executes your SQL query - Treasure Data stores the results temporarily and makes the job ID available via
${td.last_job_id}
- Multiple
td_result_export>:
tasks run in parallel, each referencing the same job ID - Each export task sends the results to a different destination
Important: Running export tasks in parallel (_parallel: true
) ensures they all access the original query's job ID before any export task completes and potentially overwrites the ${td.last_job_id}
variable.
Example Workflow (.dig) File
# Example .dig file demonstrating exporting results of a single query
# execution to multiple destinations using td_result_export> and parallelism
timezone: UTC
_export:
td:
# --- Configuration ---
# Replace 'your_database_name' with the actual Treasure Data database name
database: your_database_name
# --- End Configuration ---
# Task 1: Execute the main query
# This task runs the query defined in 'query.sql'.
# The results are stored temporarily, associated with a job ID available in td.last_job_id.
+run_main_query:
td>: queries/query.sql # Ensure 'queries/query.sql' exists in your workflow project
database: ${td.database}
# Optionally specify engine (presto or hive), priority, etc.
# engine: presto
# priority: 0
# (Optional) Task 2: Display the Job ID
# Shows the ID of the job executed by '+run_main_query'. Useful for debugging.
+show_job_id:
echo>: "Main query executed. Job ID: ${td.last_job_id}"
# Task 3: Export results to multiple destinations IN PARALLEL
# The _parallel: true is ESSENTIAL here. It ensures all sub-tasks
# start with the same value of ${td.last_job_id} (from +run_main_query)
# before any export task can overwrite it.
+result_export_group:
_parallel: true
# Sub-Task 3.1: Export to Destination 1
# Uses td_result_export>: referencing the job ID from +run_main_query.
+export_to_destination_1:
td_result_export>:
# Use the job ID from the query execution task
job_id: ${td.last_job_id}
# --- Configuration for Destination 1 ---
# Replace 'your_auth_connection_name_1' with your TD Authentication name
result_connection: your_auth_connection_name_1
result_settings:
# Replace 'your_s3_bucket' and path prefix as needed
bucket: your_s3_bucket
path_prefix: /your/path/prefix/results_${moment(session_time).format("YYYYMMDD")}_dest1_
# Specify format and other S3-specific settings
format: csv
compression: none # or gzip, etc.
header: true
delimiter: ',' # or \t, etc.
null_value: empty # or NULL, etc.
newline: LF # or CRLF
quote_policy: MINIMAL # or ALL, NONE
escape: '"'
quote: '"'
# part_size: 20 # Optional: Control part size for multipart uploads
# --- End Configuration for Destination 1 ---
# Sub-Task 3.2: Export to Destination 2
# Also uses td_result_export>: with the *same* job_id, thanks to parallelism.
+export_to_destination_2:
td_result_export>:
# Use the same job ID from the query execution task
job_id: ${td.last_job_id}
# --- Configuration for Destination 2 ---
# Replace 'your_auth_connection_name_2' (can be same or different)
result_connection: your_auth_connection_name_2
result_settings:
# Example: Different path or format for the second destination
bucket: your_s3_bucket
path_prefix: /your/other_path/data_${moment(session_time).format("YYYYMMDD")}_dest2_
format: parquet # Example: Exporting as Parquet
compression: snappy # Example: Different compression
# Add other Parquet-specific settings if needed
# --- End Configuration for Destination 2 ---
# Add more +export_to_destination_N tasks here if needed...
Detailed Explanation
Step 1: Run the Main Query
The +run_main_query
task executes your SQL query once and sets the ${td.last_job_id}
variable with the unique job ID for this execution.
Step 2: Group Export Tasks with Parallelism
The +result_export_group
with _parallel: true
setting is crucial as it ensures all sub-tasks start concurrently and each reads the same value of ${td.last_job_id}
as set by the query task.
Step 3: Configure Multiple Export Destinations
Each export task (+export_to_destination_1
, +export_to_destination_2
, etc.) can be configured with different:
- Authentication connections
- Destination buckets or paths
- File formats (CSV, Parquet, etc.)
- Compression methods
- Other format-specific settings
Best Practices
- Always use
_parallel: true
for the export group to prevent job ID conflicts - Include descriptive task names to make workflow logs easier to understand
- Consider using dynamic path naming with variables like
${moment(session_time).format("YYYYMMDD")}
- For debugging, the optional
+show_job_id
task can help verify the correct job ID is being used
Use Cases
This pattern is particularly useful when you need to:
- Export the same data in different formats (CSV for human readability, Parquet for downstream processing)
- Send results to multiple systems or teams simultaneously
- Create backup copies of query results in different locations
- Apply different naming conventions or compression settings per destination
Comments
0 comments
Please sign in to leave a comment.