You can specify incremental transfers of data and control when data transfers start as part of managing your data pipeline. Use TD Workflows to ensure query processing steps in Arm Treasure Data run only after the necessary data has been ingested into your account.
- Basic knowledge of Treasure Data, including the toolbelt
- Ability to create a data connection and input transfer. Refer to Input Transfer
Depending on the incremental transfer method you choose, you can either reference a data connector created in the console or reference a data connector’s configuration file (a .yml file).
In some cases, you’ll want to import only the most recently added or modified records. You might want to run incremental ingest when:
- Tables are too large to regularly re-import the entire table (for example. from large production databases)
- You are running frequent imports of updated data (for example every 15 minutes) to keep the data as fresh as possible
- You want to minimize the number of rows ingested, to make the most efficient use of your Treasure Data account plan’s capacity
Determine your approach
Within workflow, you can set up incremental input transfers in two ways:
|Approach||Use||Do Not Use|
|Reference an Input Data Transfer||When you are handling incremental processing logic from IDs or file names, such as using column names instead of a timestamp. Or, for example, when you want to keep track of the ID of the last record, and then pull from the next record onward at the next ingestion event.
When you are not ingesting data on a regular basis, or want the system to be more “self-healing” such as when you have a single failure event. With self-healing, if an ingest fails, but the next succeeds, the ‘diff’ calculated will be from the last successful ingestion occurrence.
|If the system from which you’re pulling batch incremental loads into Treasure Data has “late arriving data.” For example, certain SaaS analytic tools make events available through API, but do not guarantee that events become available for querying in the order the events arrive. In this case, the default logic of this approach may miss earlier data that arrives late.|
|Create a custom Data Transfer configuration||Only when the incremental flow is based on a date.
When you need flexibility in the timing. For example, when the system you’re pulling data from has late arriving data.
|You cannot use this approach if you are trying to run incremental processing based on a non-date column in the data set you’re ingesting from.|
The concept of timing, scheduling, and incremental processing
The incremental processing works by keeping track of the column value of the table and records to be imported (for example. a
ID column), and then using the highest value imported during the last ingest to start the subsequent ingest.
Refer to the following diagram:
In the diagram, the
from_date (can be last_fetched_date or any incremental field) is updated and stored after each execution. New value is used during next run.
For example, using the Mixpanel data connector:
- During the first run, you import all data.
- For subsequent incremental runs, you use the
last_fetched_time(which is the max ingestion timestamp from the previous run)
How to configure a connection for incremental transfer
Considering the preceding information, select your approach to set up incremental input transfers within TD Workflow:
- Reference an Input Data Transfer created within the Treasure Data Console
- Create a custom Data Transfer configuration as part of a data connector configuration file (.yml)