SubmitSql
Submit a SQL statement to Flink SQL Gateway.
This task submits a SQL statement to Apache Flink via the SQL Gateway. No JAR file is required as the SQL is executed directly by Flink.
type: "io.kestra.plugin.flink.SubmitSql"Examples
Execute a streaming SQL query
id: flink-sql-streaming
namespace: company.team
tasks:
  - id: run-sql
    type: io.kestra.plugin.flink.SubmitSql
    gatewayUrl: "http://flink-sql-gateway:8083"
    statement: |
      INSERT INTO enriched_orders
      SELECT o.order_id, o.customer_id, c.name, o.amount, o.order_time
      FROM orders o
      JOIN customers c ON o.customer_id = c.id
    sessionConfig:
      catalog: "default_catalog"
      database: "default_database"
      configuration:
        execution.runtime-mode: "streaming"
        execution.checkpointing.interval: "30s"
Execute a batch SQL query
id: flink-sql-batch
namespace: company.team
tasks:
  - id: run-batch-sql
    type: io.kestra.plugin.flink.SubmitSql
    gatewayUrl: "http://flink-sql-gateway:8083"
    statement: |
      CREATE TABLE daily_summary AS
      SELECT DATE(order_time) as order_date,
             COUNT(*) as order_count,
             SUM(amount) as total_amount
      FROM orders
      WHERE order_time >= '2024-01-01'
      GROUP BY DATE(order_time)
    sessionConfig:
      configuration:
        execution.runtime-mode: "batch"
Properties
gatewayUrl *Requiredstring
SQL Gateway URL
The base URL of the Flink SQL Gateway, e.g., 'http://flink-sql-gateway: 8083'
statement *Requiredstring
SQL statement
The SQL statement to execute. Supports both DDL and DML statements.
acceptableStates array
Acceptable terminal states
List of operation states to consider as successful completion. For streaming jobs, include 'RUNNING' - these sessions will be kept alive. For batch jobs, use 'FINISHED'. Defaults to 'FINISHED', 'RUNNING'.
connectionTimeout integerstring
30Connection timeout
Timeout for connecting to the SQL Gateway in seconds. Defaults to 30.
sessionConfig SubmitSql-SessionConfigstring
Session configuration
Session configuration including catalog, database, and Flink configuration properties.
sessionName string
Session name
Optional session name. If not provided, a random session will be created.
statementTimeout integerstring
300Statement timeout
Timeout for SQL statement execution in seconds. Defaults to 300.
Outputs
operationHandle string
The operation handle
The unique identifier for the executed SQL operation
resultCount integer
Result count
Number of rows affected or returned by the operation
sessionHandle string
The session handle
The unique identifier for the SQL Gateway session
status string
Operation status
Final status of the operation