The data connector for Amazon S3 enables you to import the data from your JSON, TSV, and CSV files stored in an S3 bucket.
For sample workflows on importing data from files stored in an S3 bucket, go to the Treasure Box on Github.
Prerequisites
- Basic knowledge of Arm Treasure Data
Use the TD Console to create your connection
You can use TD Console to create your data connector.
Create a new connection
When you configure a data connection, you provide authentication to access the integration. In Treasure Data, you configure the authentication and then specify the source information.
Go to Catalog and search and select AWS S3. Click Create.
Using Credentials to Authenticate
You need a client ID and access keys to authenticate using credentials.
Register your credential. Set the following parameters.
- Endpoint: S3 endpoint login user name. You can find a region and endpoint information from AWS Document. (Ex. s3-ap-northeast-1.amazonaws.com)
- Authentication Method:
- basic: uses access_key_id and secret_access_key to authenticate. See here.
- Access Key ID
- Secret access key
- anonymous: uses anonymous access. This auth method can access only public files.
- session: uses temporary-generated access_key_id, secret_access_key and session_token. (Please note that this authentication method is only available with data import. This can't be used with data export for now.)
- Access Key ID
- Secret access key
- Secret token
- basic: uses access_key_id and secret_access_key to authenticate. See here.
Click Continue. Name your new AWS S3 connection. Click Done.
Transfer data from AWS S3
Next, you need to create “New Source” on Authentications page. You can prepare an ad hoc DataConnector job or a scheduled data connector job. In this section, the following 4 steps are required.
Fetch from
You need to register the information that you want to ingest.
- Bucket: provide the S3 bucket name (Ex. your_bucket_name)
- Path Prefix: specify a prefix for target keys. (Ex. logs/data_)
- Path Regex: use regexp to match file paths. If a file path doesn’t match with the specified pattern, the file is skipped. For example, if you specify the pattern .csv$ # , then a file is skipped if its path doesn’t match the pattern.
- Skip Glacier Objects: select to skip processing objects stored in Amazon Glacier storage class. If objects are stored in Glacier storage class, but this option is not checked, an exception is thrown.
- Filter by Modified Time: choose how to filter files for ingestion:
- If it is unchecked (default):
- Start after path: inserts last_path parameter so that first execution skips files before the path. (Ex. logs/data_20170101.csv)
- Incremental: enables incremental loading. If incremental loading is enabled, config diff for the next execution includes the last_path parameter so that next execution skips files before the path. Otherwise, last_path is not included.
- If it is checked
- Modified after: inserts last_modified_time parameters so that first executions skip files that were modified before that specified timestamp (Ex. 2019-06-03T10:30:19.806Z)
- Incremental by Modified Time: enables incremental loading by modified time. If incremental loading is enabled, config diff for next execution includes the last_modified_time parameter so that next execution skips files that were modified before that time. Otherwise, last_modified_time is not included.
- If it is unchecked (default):
You can limit the access to your S3 bucket/IAM user by using a list of static IPs. Contact support@treasuredata.com if you need it. |
There are instances where you might need to scan all the files in a directory (such as from the top level directory "/"). In such instances, you must use the CLI to do the import (see the Appendix for instructions on how to import using the CLI.) |
Example: CloudFront
Amazon CloudFront is a web service that speeds up distribution of your static and dynamic web content. You can configure CloudFront to create log files that contain detailed information about every user request that CloudFront receives. If you enable logging, you can save CloudFront logfiles, shown as follows:
[your_bucket] - [logging] - [E231A697YXWD39.2017-04-23-15.a103fd5a.gz] [your_bucket] - [logging] - [E231A697YXWD39.2017-04-23-15.b2aede4a.gz] [your_bucket] - [logging] - [E231A697YXWD39.2017-04-23-16.594fa8e6.gz] [your_bucket] - [logging] - [E231A697YXWD39.2017-04-23-16.d12f42f9.gz]
In this case, “Fetch from” settings are as shown:
- Bucket: your_bucket
- Path Prefix: logging/
- Path Regex: .gz$ (Not Required)
- Start after path: logging/E231A697YXWD39.2017-04-23-15.b2aede4a.gz (Assuming that you want to import the log files from 2017-04-23-16.)
- Incremental: true (if you want to schedule this job.)
Preview
In this section, you can see a preview of data you configured. If you can't reach this Preview page, you might get any errors if you try to proceed to import using TD Console. See Preview for more information.
If you would like to set specified column name, choose Advanced Settings.
Advanced Settings
Advanced Settings allow you to edit guessed properties. Edit the following section, if you need to.
- Default timezone: changes the time zone of timestamp columns if the value itself doesn’t include time zone.
- Columns:
- Name: changes a name of the column. Column name is supported consisting of lowercase alphabets, numbers, and “_” only.
- Type: parses a value as a specified type. And then, it stores after converting to TreasureData schema.
- boolean
- long
- timestamp: is imported as String type at TreasureData (Ex. 2017-04-01 00:00:00.000)
- double
- string
- json
- Total file count limit: maximum number of files to read. (optional)
Transfer to |
In this phase, select your target database and table that you want to import data to.
- Mode: Append/Replace
- Partition key Seed: choose the long or timestamp column as the partitioning time. As default time column, it’s used upload_time with using add_time filter.
When
In this phase, you can set an ad hoc or schedule configuration for your job.
- When
- Once now: set one time job.
- Repeat…
- Schedule: accepts these three options: @hourly, @daily and @monthly and custom cron.
- Delay Transfer: add a delay of execution time.
- TimeZone: supports extended timezone formats like ‘Asia/Tokyo’.
My Input Transfers
Your data connector jobs are listed on the jobs page of the TD Console.
Scheduled Execution
You can schedule periodic data connector execution for incremental S3 file import. We configure our scheduler carefully to ensure high availability.
For the scheduled import, the Data Connector for Amazon S3 imports all files that match with the specified prefix (e.g. path_prefix: path/to/sample_ –> path/to/sample_201501.csv.gz, path/to/sample_201502.csv.gz, …, path/to/sample_201505.csv.gz) initially and remembers one of these fields by condition:
- If use_modified_time is disabled, the last path (path/to/sample_201505.csv.gz) is saved for the next execution. On the second and on subsequent runs, the connector imports only files that comes after the last path in alphabetical (lexicographic) order.(path/to/sample_201506.csv.gz, …)
- Otherwise, the time that the job is executed (i.e. 2019-06-17T12:00:00.000Z) is saved for the next execution. On the second and on subsequent runs, the connector imports only files that modified after that execution time in alphabetical (lexicographic) order.
Create a Schedule using the TD Toolbelt
A new schedule can be created using the td connector:create command.
$ td connector:create daily_import "10 0 * * *" \ td_sample_db td_sample_table load.yml
It’s also recommended to specify the --time-column option, because Treasure Data’s storage is partitioned by time (see also data partitioning)
$ td connector:create daily_import "10 0 * * *" \ td_sample_db td_sample_table load.yml \ --time-column created_at
The `cron` parameter also accepts three special options: `@hourly`, `@daily` and `@monthly`. |
By default, schedule is setup in UTC timezone. You can set the schedule in a timezone using -t or --timezone option. `--timezone` option supports only extended timezone formats like 'Asia/Tokyo', 'America/Los_Angeles' etc. Timezone abbreviations like PST, CST are *not* supported and may lead to unexpected schedules. |
List All Schedules
You can see the list of currently scheduled entries by running the command td connector:list.
$ td connector:list +--------------+--------------+----------+-------+--------------+-----------------+------------------------------------------+ | Name | Cron | Timezone | Delay | Database | Table | Config | +--------------+--------------+----------+-------+--------------+-----------------+------------------------------------------+ | daily_import | 10 0 * * * | UTC | 0 | td_sample_db | td_sample_table | {"in"=>{"type"=>"s3", "access_key_id"... | +--------------+--------------+----------+-------+--------------+-----------------+------------------------------------------+
Show Schedule Settings And History
td connector:show shows the execution setting of a schedule entry.
% td connector:show daily_import Name : daily_import Cron : 10 0 * * * Timezone : UTC Delay : 0 Database : td_sample_db Table : td_sample_table Config --- in: type: s3 access_key_id: XXXXXXXXXX secret_access_key: YYYYYYYYYY endpoint: s3.amazonaws.com bucket: sample_bucket path_prefix: path/to/sample_ parser: charset: UTF-8 ...
td connector:history shows the execution history of a schedule entry. To investigate the results of each individual run, use td job <jobid>.
% td connector:history daily_import +--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+ | JobID | Status | Records | Database | Table | Priority | Started | Duration | +--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+ | 578066 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-18 00:10:05 +0000 | 160 | | 577968 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-17 00:10:07 +0000 | 161 | | 577914 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-16 00:10:03 +0000 | 152 | | 577872 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-15 00:10:04 +0000 | 163 | | 577810 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-14 00:10:04 +0000 | 164 | | 577766 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-13 00:10:04 +0000 | 155 | | 577710 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-12 00:10:05 +0000 | 156 | | 577610 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-11 00:10:04 +0000 | 157 | +--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+ 8 rows in set
Delete Schedule
td connector:delete removes the schedule.
$ td connector:delete daily_import
FAQ for the S3 Data Connector
Q: Data Connector for S3 job is running for a long time, what can I do?
- Check the count of S3 files that your connector job is ingesting. If there are over 10,000 files, the performance degrades. To mitigate this issue, you can:
- Narrow path_prefix option and reduce the count of S3 files.
- Set 268,435,456 (256MB) to min_task_size option.
Q: How do I troubleshoot data import problems?
Review the job log. Warning and errors provide information about the success of your import. For example, you can identify the source file names associated with import errors.
Appendix
A) Optional Alternative: Use the CLI to Configure the Connector
Before setting up the connector, install the ‘td’ command. Install the most current Treasure Data Toolbelt.
Create Seed Config File (seed.yml)
Prepare the seed.yml as shown in the following example, with your AWS access key and secret access key. You must also specify bucket name, and source file name (or prefix for multiple files).
in: type: s3 access_key_id: XXXXXXXXXX secret_access_key: YYYYYYYYYY bucket: sample_bucket # path to the *.json or *.csv or *.tsv file on your s3 bucket path_prefix: path/to/sample_file out: mode: append
The Data Connector for Amazon S3 imports all files that match the specified prefix. (e.g. path_prefix: path/to/sample_ –> path/to/sample_201501.csv.gz, path/to/sample_201502.csv.gz, …, path/to/sample_201505.csv.gz)
Using path_prefix with leading '/', can lead to unintended results. For example: "path_prefix: /path/to/sample_file" would result in plugin looking for file in s3://sample_bucket//path/to/sample_file which is different on S3 than the intended path of s3://sample_bucket/path/to/sample_file |
For more details on available out modes, see Appendix.
Guess Fields (Generate load.yml)
Use connector:guess. This command automatically reads the source files, and assesses (uses logic to guess) the file format and its field/columns.
$ td connector:guess seed.yml -o load.yml
If you open up load.yml, you’ll see the assessed file format definitions including file formats, encodings, column names, and types.
in: type: s3 access_key_id: XXXXXXXXXX secret_access_key: YYYYYYYYYY bucket: sample_bucket path_prefix: path/to/sample_file parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' escape: '' skip_header_lines: 1 columns: - name: id type: long - name: company type: string - name: customer type: string - name: created_at type: timestamp format: '%Y-%m-%d %H:%M:%S' out: mode: append
Then, you can see a preview of the data using the td connector:preview command.
$ td connector:preview load.yml +-------+---------+----------+---------------------+ | id | company | customer | created_at | +-------+---------+----------+---------------------+ | 11200 | AA Inc. | David | 2015-03-31 06:12:37 | | 20313 | BB Imc. | Tom | 2015-04-01 01:00:07 | | 32132 | CC Inc. | Fernando | 2015-04-01 10:33:41 | | 40133 | DD Inc. | Cesar | 2015-04-02 05:12:32 | | 93133 | EE Inc. | Jake | 2015-04-02 14:11:13 | +-------+---------+----------+---------------------+
The guess command needs more than 3 rows and 2 columns in source data file, because the command assesses the column definition using sample rows from source data. |
If the system detects your column name or column type unexpectedly, modify load.yml directly and preview again.
Currently, the Data Connector supports parsing of “boolean”, “long”, “double”, “string”, and “timestamp” types.
The `preview` command downloads one file from the specified bucket and display the results from that file. This may cause a difference in results from the preview and issue commands. See Preview for more information. |
Execute Load Job
Submit the load job. It may take a couple of hours depending on the size of the data. Specify the Treasure Data database and table where the data should be stored.
It’s also recommended to specify --time-column option, because Treasure Data’s storage is partitioned by time (see data partitioning) If the option is not provided, the data connector chooses the first long or timestamp column as the partitioning time. The type of the column specified by --time-column must be either of long and timestamp type.
If your data doesn’t have a time column you can add a time column by using add_time filter option. For more details see add_time filter plugin
$ td connector:issue load.yml --database td_sample_db --table td_sample_table \ --time-column created_at
The connector:issue command assumes that you have already created a database(td_sample_db) and a table(td_sample_table). If the database or the table do not exist in TD, this command will not succeed, so create the database and table manually or use --auto-create-table option with td connector:issue command to auto create the database and table:
$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table
The data connector does not sort records on server-side. To use time-based partitioning effectively, sort records in files beforehand. |
If you have a field called time, you don’t have to specify the --time-column option. |
$ td connector:issue load.yml --database td_sample_db --table td_sample_table
B) Modes for out plugin
You can specify file import mode in out section of seed.yml.
append (default)
This is the default mode and records are appended to the target table.
in: ... out: mode: append
replace
This mode replaces data in the target table. Any manual schema changes made to the target table remain intact with this mode.
in: ... out: mode: replace
C) IAM Permissions
The IAM credentials specified in the YML configuration file and used for the connector:guess and connector:issue commands need to be allowed permissions for the AWS S3 resources that they need to access. If the IAM user does not possess these permissions, configure the user with one of the predefined Policy Definitions or create a new Policy Definition in JSON format.
The following example is based on the Policy Definition reference format, giving the IAM user read only (through GetObject and ListBucket actions) permission for the your-bucket:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::your-bucket", "arn:aws:s3:::your-bucket/*" ] } ] }
Replace your-bucket with the actual name of your bucket.
D) Use AWS Security Token Service (STS) as temporary credentials provider
In certain cases, IAM basic authentication through access_key_id and secret_access_key might be too risky (although the secret_access_key is never clearly shown when a job is executed or after a session is created).
The S3 data connector can use AWS Secure Token Service (STS) provided Temporary Security Credentials. Using AWS STS, any IAM user can use his own access_key_id and secret_access_key to create a set of temporary new_access_key_id, new_secret_access_key, and session_token keys with an associated expiration time, after which the credentials become invalid.
There are essentially 2 types of Temporary Security Credentials:
- Session Token
The simplest Security Credentials with an associated expiration time. The temporary credentials give access to all resources the original IAM credentials used to generate them had. These credentials are valid as long as they are not expired and the permissions of the original IAM credentials don’t change. - Federation Token
Adds an extra layer of permission control over the Session Token above. When generating a Federation Token, the IAM user is required to specify a Permission Policy definition. The scope can be used to further narrow down which of the resources, accessible to the IAM user, the bearer of the Federation Token should get access to. Any Permission Policy definition can be used but the scope of the permission is limited to only all or a subset of the permissions the IAM user used to generate the token had. As for the Session Token, the Federation Token credentials are valid as long as they are not expired and the permissions associated to the original IAM credentials don’t change.
AWS STS Temporary Security Credentials can be generated using the AWS CLI or the AWS SDK in the language of your choice.
Session Token
$ aws sts get-session-token --duration-seconds 900 { "Credentials": { "SecretAccessKey": "YYYYYYYYYY", "SessionToken": "ZZZZZZZZZZ", "Expiration": "2015-12-23T05:11:14Z", "AccessKeyId": "XXXXXXXXXX" } }
Federation Token
$ aws sts get-federation-token --name temp_creds --duration-seconds 900 \ --policy '{"Statement": [{"Effect": "Allow", "Action": ["s3:GetObject", "s3:ListBucket"], "Resource": "arn:aws:s3:::bucketname"}]}' { "FederatedUser": { "FederatedUserId": "523683666290:temp_creds", "Arn": "arn:aws:sts::523683666290:federated-user/temp_creds" }, "Credentials": { "SecretAccessKey": "YYYYYYYYYY", "SessionToken": "ZZZZZZZZZZ", "Expiration": "2015-12-23T06:06:17Z", "AccessKeyId": "XXXXXXXXXX" }, "PackedPolicySize": 16 }
where: * temp_cred is the name of the Federated token/user * bucketname is the name of the bucket to give access to. Refer to the ARN specification for more details * s3:GetObject and s3:ListBucket are the basic read operation for a AWS S3 bucket.
AWS STS credentials cannot be revoked. They will remain effective until expired, or until you delete or remove the permissions of the original IAM user used to generate the credentials. |
When your Temporary Security Credentials are generated, copy the SecretAccessKey, AccessKeyId, and SessionToken in your seed.yml file as follows.
in: type: s3 auth_method: session access_key_id: XXXXXXXXXX secret_access_key: YYYYYYYYYY session_token: ZZZZZZZZZZ bucket: sample_bucket path_prefix: path/to/sample_file
and execute the Data Connector for S3 as usual.
Because STS credentials expire after the specified amount of time, the data connector job that uses the credential might eventually start failing when credential expiration occurs. Currently, if the STS credentials are reported expired, the data connector job retries up to the maximum number of times (5) and eventually complete with 'error' status. |
E) IP Whitelist and VPC
You can access here for IP addresses TD is using.
Look up the region of TD Console by the URL you are logging in to TD, then refer to the "Data Connector" of your region in above URL.
Region of TD Console | URL |
US | https://console.treasuredata.com |
Tokyo | https://console.treasuredata.co.jp |
EU01 | https://console.eu01.treasuredata.com |
Comments
0 comments
Please sign in to leave a comment.