Ingest data from S3 buckets
Use the SQL statement below to connect RisingWave to an Amazon S3 source. RisingWave supports CSV, ndjson and Parquet file formats.
The S3 connector does not guarantee the sequential reading of files or complete file reading.
Syntax
For CSV data, specify the delimiter in the delimiter
option in ENCODE properties
.
schema_definition:
Parameters
Field | Notes |
---|---|
connector | Required. Support the s3 connector only. |
s3.region_name | Required. The service region. |
s3.bucket_name | Required. The name of the bucket the data source is stored in. |
s3.credentials.access | Required. This field indicates the access key ID of AWS. |
s3.credentials.secret | Required. This field indicates the secret access key of AWS. |
s3.endpoint_url | Conditional. The host URL for an S3-compatible object storage server. This allows users to use a different server instead of the standard S3 server. |
compression_format | Optional. This field specifies the compression format of the file being read. You can define compression_format in the CREATE TABLE statement. When set to gzip or gz, the file reader reads all files with the .gz suffix. When set to None or not defined, the file reader will automatically read and decompress .gz and .gzip files. |
match_pattern | Conditional. This field is used to find object keys in s3.bucket_name that match the given pattern. Standard Unix-style glob syntax is supported. |
s3.assume_role | Optional. Specifies the ARN of an IAM role to assume when accessing S3. It allows temporary, secure access to S3 resources without sharing long-term credentials. |
refresh.interval.sec | Optional. Configure the time interval between operations of listing files. It determines the delay in discovering new files, with a default value of 60 seconds. |
Empty cells in CSV files will be parsed to NULL
.
Field | Notes |
---|---|
data_format | Supported data format: PLAIN. |
data_encode | Supported data encodes: CSV, JSON, PARQUET. |
without_header | This field is only for CSV encode, and it indicates whether the first line is header. Accepted values: true , false . Default is true . |
delimiter | How RisingWave splits contents. For JSON encode, the delimiter is \n ; for CSV encode, the delimiter can be one of , , ; , E'\t' . |
Additional columns
Field | Notes |
---|---|
file | Optional. The column contains the file name where current record comes from. |
offset | Optional. The column contains the corresponding bytes offset (record offset for parquet files) where current message begins |
Examples
Here are examples of connecting RisingWave to an S3 source to read data from individual streams.
Important considerations
Object filtering in S3 buckets
RisingWave has a prefix argument designed for filtering objects in the S3 bucket. It relies on Apache Opendal whose prefix filter implementation is expected to be released soon.
Handle new files in the bucket
RisingWave automatically ingests new files added to the bucket. However, it does not detect updates to a file if a file is deleted and a new file with the same name is added simultaneously. Additionally, RisingWave will ignore file deletions.
Read data from the source
You need to create a materialized view from the source or create a table with the S3 connector to read the data. Here are some examples:
Read Parquet files from S3
You can use the table function file_scan()
to read Parquet files from S3, either a single file or a directory of Parquet files.
Function signature
When reading a directory of Parquet files, the schema will be based on the first Parquet file listed. Please ensure that all Parquet files in the directory have the same schema.
For example, assume you have a Parquet file named sales_data.parquet
that stores a company’s sales data, containing the following fields:
product_id
: Product IDsales_date
: Sales datequantity
: Sales quantityrevenue
: Sales revenue
You can use the following SQL statement to read this Parquet file:
Read a single Parquet file
If you have several such Parquet files, you can also read by their file directory:
Read a directory of Parquet files
Handle unexpected file types or poorly formatted files
RisingWave will attempt to interpret and parse files, regardless of their type, as CSV or ndjson, based on the specified rules. Warnings will be reported for parts of the file that cannot be parsed, but the source part will not fail. Poorly formatted parts of a file will be discarded.