Ingest data from Kafka
This topic describes how to connect RisingWave to a Kafka broker that you want to receive data from, and how to specify data formats, schemas, and security (encryption and authentication) settings.
A source is a resource that RisingWave can read data from. You can create a source in RisingWave using the CREATE SOURCE
command. When creating a source, you can choose to persist the data from the source in RisingWave by using the CREATE TABLE
command and specifying the connection settings and data format.
Regardless of whether the data is persisted in RisingWave, you can create materialized views to perform analysis or data transformations.
RisingWave supports exactly-once semantics by reading transactional messages only when the associated transaction has been committed. This is the set behavior for RisingWave and not configurable.
GUIDED SETUP
RisingWave Cloud provides an intuitive guided setup for creating a Kafka source. For more information, see Create a source using guided setup in the RisingWave Cloud documentation.
Syntax
schema_definition:
For Avro and Protobuf data, do not specify schema_definition
in the CREATE SOURCE
statement.
RisingWave performs primary key constraint checks on tables but not on sources. If you need the checks to be performed, please create a table. For tables with primary key constraints, if a new data record with an existing key comes in, the new record will overwrite the existing record.
Parameters
Connector parameters
Field | Notes |
---|---|
topic | Required. Address of the Kafka topic. One source can only correspond to one topic. |
properties.bootstrap.server | Required. Address of the Kafka broker. Format: ip:port,ip:port . |
scan.startup.mode | Optional. The offset mode that RisingWave will use to consume data. The two supported modes are earliest (read from low watermark) and latest (read from high watermark). If not specified, the default value earliest will be used. |
scan.startup.timestamp.millis | Optional. RisingWave will start to consume data from the specified UNIX timestamp (milliseconds). If this field is specified, the value for scan.startup.mode will be ignored. |
group.id.prefix | Optional. Specify a custom group ID prefix for the source. The default prefix is rw-consumer . Each job (materialized view) will have a separate consumer group with a generated suffix in the group ID, so the format of the consumer group is {group_id_prefix}-{fragment_id} . This is used to monitor progress in external Kafka tools and for authorization purposes. RisingWave does not rely on committed offsets or join the consumer group. It only reports offsets to the group. |
properties.sync.call.timeout | Optional. Specify the timeout. By default, the timeout is 5 seconds. |
properties.client.id | Optional. Client ID associated with the Kafka client. |
Other parameters
Field | Notes |
---|---|
data_format | Data format. Supported formats: DEBEZIUM, MAXWELL, CANAL, UPSERT, PLAIN. |
data_encode | Data encode. Supported encodes: JSON, AVRO, PROTOBUF, CSV. |
message | Message name of the main Message in schema definition. Required for Protobuf. |
location | Web location of the schema file in http://... , https://... , or S3://... format.
|
schema.registry | Confluent Schema Registry URL. Example: http://127.0.0.1:8081 .
|
schema.registry.username | Conditional. User name for the schema registry. It must be specified with schema.registry.password. |
schema.registry.password | Conditional. Password for the schema registry. It must be specified with schema.registry.username. |
access_key | Required if loading descriptors from S3. The access key ID of AWS. |
secret_key | Required if loading descriptors from S3. The secret access key of AWS. |
region | Required if loading descriptors from S3. The AWS service region. |
arn | Optional. The Amazon Resource Name (ARN) of the role to assume. |
external_id | Optional. The external id used to authorize access to third-party resources. |
Additional Kafka parameters
When creating a source in RisingWave, you can specify the following Kafka parameters. To set the parameter, add the RisingWave equivalent of the Kafka parameter under the WITH options
. For an example of the usage of these parameters, see the JSON example. For additional details on these parameters, see the Configuration properties.
Kafka parameter name | RisingWave parameter name | Type |
---|---|---|
enable.auto.commit | properties.enable.auto.commit | boolean |
enable.ssl.certificate.verification | properties.enable.ssl.certificate.verification | bool |
fetch.max.bytes | properties.fetch.max.bytes | int |
fetch.queue.backoff.ms | properties.fetch.queue.backoff.ms | int |
fetch.wait.max.ms | properties.fetch.wait.max.ms | int |
message.max.bytes | properties.message.max.bytes | int |
queued.max.messages.kbytes | properties.queued.max.messages.kbytes | int |
queued.min.messages | properties.queued.min.messages | int |
receive.message.max.bytes | properties.receive.message.max.bytes | int |
ssl.endpoint.identification.algorithm | properties.ssl.endpoint.identification.algorithm | str |
statistics.interval.ms | properties.statistics.interval.ms | int |
To monitor Kafka metrics in Grafana, set properties.statistics.interval.ms
to a non-zero value. The granularity is 1000ms.
The additional Kafka parameters queued.min.messages
and queued.max.messages.kbytes
are specified with properties.queued.min.messages
and properties.queued.max.messages.kbytes
, respectively, when creating the source.
Set properties.ssl.endpoint.identification.algorithm
to none
to bypass the verification of CA certificates and resolve SSL handshake failure. This parameter can be set to either https
or none
. By default, it is https
.
Specific parameters for Amazon MSK
There are some specific parameters for Amazon Managed Streaming for Apache Kafka (MSK), please see Access MSK in RisingWave for more details.
Examples
Here are examples of connecting RisingWave to a Kafka broker to read data from individual topics.
RisingWave supports reading messages that have been compressed by zstd. Additional configurations are not required.
Query Kafka timestamp
For each Kafka source created, the virtual column, _rw_kafka_timestamp
, will also exist. This column includes the timestamp of the Kafka message.
You can include this column in your views or materialized views to display the Kafka timestamp. Here is an example.
If directly querying from the source, you can use _rw_kafka_timestamp
to filter messages sent within a specific time period. For example, the following query only selects messages sent in the past 10 minutes.
Read schemas from locations
RisingWave supports reading schemas from a Web location in http://...
, https://...
, or S3://...
format, or a Confluent Schema Registry for Kafka data in Protobuf format. For Avro, Confluent Schema Registry and AWS Glue Schema Registry are supported for reading schemas.
For Protobuf, if a schema location is specified, the schema file must be a FileDescriptorSet
, which can be compiled from a .proto
file with a command like this:
To specify a schema location, add this clause to a CREATE SOURCE
statement.
If a primary key also needs to be defined, use the table constraint syntax.
Read schemas from Confluent Schema Registry
Confluent and Karapace Schema Registry provide a serving layer for your metadata. They provide a RESTful interface for storing and retrieving your schemas.
RisingWave supports reading schemas from a Confluent Schema Registry. The latest schema will be retrieved from the specified Confluent Schema Registry using the TopicNameStrategy
strategy when the CREATE SOURCE
statement is issued. Then the schema parser in RisingWave will automatically determine the columns and data types to use in the source.
To specify the Confluent Schema Registry, add this clause to a CREATE SOURCE
statement.
To learn more about Confluent Schema Registry and how to set up a Schema Registry, refer to the Confluent Schema Registry documentation.
To learn more about Karapace Schema Registry and how to get started, see Get started with Karapace.
If a primary key also needs to be defined, use the table constraint syntax.
Schema evolution
Based on the compatibility type that is configured for the schema registry, some changes are allowed without changing the schema to a different version. In this case, RisingWave will continue using the original schema definition. To use a newer version of the writer schema in RisingWave, you need to drop and recreate the source.
To learn about compatibility types for Schema Registry and the changes allowed, see Compatibility Types.
Read schemas from AWS Glue Schema Registry
PREMIUM EDITION FEATURE
This is a Premium Edition feature. All Premium Edition features are available out of the box without additional cost on RisingWave Cloud. For self-hosted deployments, users need to purchase a license key to access this feature. To purchase a license key, please contact sales team at sales@risingwave-labs.com.
For a full list of Premium Edition features, see RisingWave Premium Edition.
AWS Glue Schema Registry is a serverless feature of AWS Glue that allows you to centrally manage and enforce schemas for data streams, enabling data validation and compatibility checks. It helps in improving the quality of data streams by providing a central repository for managing and enforcing schemas across various AWS services and custom applications.
You can specify the following configurations in the ENCODE AVRO (...)
clause to read schemas from Glue.
Auth-related configurations:
aws.region
: The region of the AWS Glue Schema Registry. For example,us-west-2
.aws.credentials.access_key_id
: Your AWS access key ID.aws.credentials.secret_access_key
: Your AWS secret access key.aws.credentials.role.arn
: The Amazon Resource Name (ARN) of the role to assume. For example,arn:aws:iam::123456123456:role/MyGlueRole
.- This IAM role shall be granted permissions for the action
glue:GetSchemaVersion
.
- This IAM role shall be granted permissions for the action
ARN to the schema:
aws.glue.schema_arn
: The ARN of the schema in AWS Glue Schema Registry. For example,'arn:aws:glue:ap-southeast-1:123456123456:schema/default-registry/MyEvent'
.
Create source with PrivateLink connection
If your Kafka source service is located in a different VPC from RisingWave, use AWS PrivateLink to establish a secure and direct connection. For details on how to set up an AWS PrivateLink connection, see Create an AWS PrivateLink connection.
To create a Kafka source with a PrivateLink connection, in the WITH section of your CREATE SOURCE
or CREATE TABLE
statement, specify the following parameters.
Parameter | Notes |
---|---|
privatelink.targets | The PrivateLink targets that correspond to the Kafka brokers. The targets should be in JSON format. Note that each target listed corresponds to each broker specified in the properties.bootstrap.server field. If the order is incorrect, there will be connectivity issues. |
privatelink.endpoint | The DNS name of the VPC endpoint. If you’re using RisingWave Cloud, you can find the auto-generated endpoint after you created a connection. See details in Create a PrivateLink connection. |
Here is an example of creating a Kafka source using a PrivateLink connection. Notice that {"port": 9094}
corresponds to the broker broker1-endpoint
, {"port": 9095}
corresponds to the broker broker2-endpoint
, and {"port": 9096}
corresponds to the broker broker3-endpoint
.
TLS/SSL encryption and SASL authentication
RisingWave can read Kafka data that is encrypted with Transport Layer Security (TLS) and/or authenticated with SASL.
Secure Sockets Layer (SSL) was the predecessor of Transport Layer Security (TLS), and has been deprecated since June 2015. For historical reasons, SSL
is used in configuration and code instead of TLS
.
Simple Authentication and Security Layer (SASL) is a framework for authentication and data security in Internet protocols.
RisingWave supports these SASL authentication mechanisms:
SASL/PLAIN
SASL/SCRAM
SASL/GSSAPI
SASL/OAUTHBEARER
SSL encryption can be used concurrently with SASL authentication mechanisms.
To learn about how to enable SSL encryption and SASL authentication in Kafka, including how to generate the keys and certificates, see the Security Tutorial from Confluent.
You need to specify encryption and authentication parameters in the WITH section of a CREATE SOURCE
statement.
To read data encrypted with SSL without SASL authentication, specify these parameters in the WITH section of your CREATE SOURCE
statement.
Parameter | Notes |
---|---|
properties.security.protocol | Set to SSL. |
properties.ssl.ca.location | |
properties.ssl.certificate.location | |
properties.ssl.key.location | |
properties.ssl.key.password |
For the definitions of the parameters, see the librdkafka properties list. Note that the parameters in the list assumes all parameters start with properties.
and therefore do not include this prefix.
Here is an example of creating a table encrypted with SSL without using SASL authentication.