Embedded Python UDFs
You can define embedded Python UDFs, which will be executed in an embedded Python interpreter within RisingWave. The Python code is directly included within the CREATE FUNCTION statement.
Currently, embedded Python UDFs only support pure computational logic and do not support accessing external networks or file systems. If you need to access external services or resources, you can use Python UDFs as external functions.
Define your functions
You can create Python UDFs using the CREATE FUNCTION
command. Refer to the syntax below:
For example, the scalar function gcd
can be defined as follows:
The Python code must contain a function that has the same name as declared in the CREATE FUNCTION
statement. The function’s parameters and return type must match those declared in the CREATE FUNCTION
statement, otherwise, an error may occur when the function is called.
See the correspondence between SQL types and Python types in the Data type mapping.
Due to the nature of Python, the correctness of the source code cannot be verified when creating a function. It is recommended to make sure your implementation is correct through batch query before using UDFs in materialized views. If an error occurs when executing UDF in materialized views, all output results will be NULL.
For table functions, your function needs to return an iterator using the yield
statement. For example, to generate a sequence from 0 to n-1:
If your function returns structured types, the Python function should return an object or dictionary containing structured data. For example, to parse key-value pairs in a string, both of the following implementations work:
Define your aggregate functions
You can create aggregate functions using the CREATE AGGREGATE command. Refer to the syntax below:
In the function_body, the code should define several functions to implement the aggregate function.
Required functions:
create_state() -> state
: Create a new state.accumulate(state, *args) -> state
: Accumulate a new value into the state, returning the updated state.
Optional functions:
finish(state) -> value
: Get the result of the aggregate function. If not defined, the state is returned as the result.retract(state, *args) -> state
: Retract a value from the state, returning the updated state. If not defined, the state can not be updated incrementally in materialized views and performance may be affected.
The following command creates an aggregate function named weighted_avg
to calculate the weighted average.
Limitations
Currently, embedded Python UDFs are only allowed to use the following standard libraries: json
, decimal
, re
, math
, datetime
. Other third-party libraries are not supported. Embedded Python UDFs cannot access external resources, and the following built-in functions are also not allowed: breakpoint
, exit
, eval
, help
, input
, open
, print
.
Data type mapping
The following table shows the data type mapping between SQL and Python:
SQL Type | Python Type | Notes |
---|---|---|
BOOLEAN | bool | |
SMALLINT | int | |
INT | int | |
BIGINT | int | |
REAL | float | |
DOUBLE PRECISION | float | |
DECIMAL | decimal.Decimal | |
DATE | datetime.date | Not supported yet. |
TIME | datetime.time | Not supported yet. |
TIMESTAMP | datetime.datetime | Not supported yet. |
TIMESTAMPTZ | datetime.datetime | Not supported yet. |
INTERVAL | MonthDayNano / (int, int, int) | Not supported yet. |
VARCHAR | str | |
BYTEA | bytes | |
JSONB | bool, int, float, list, dict | |
T[] | list[T] | |
STRUCT<> | class or dict | |
…others | Not supported yet. |