How sqlExecute queries run in Apache Flink when triggered via processFunction?? How are the SQL Tasks managed?
02:38 07 Mar 2025

I am trying to build a Flink application that runs rules dynamically. I have a rule Stream from where SQL rules are written, which Flink reads from and executes. I have connected the rules stream to a process, and inside the process, I was somehow able to get the tableEnv and trigger the sqlExecute.

ruleStream.process(new ProcessFunction() {
            @Override
            public void processElement(Rule rule, ProcessFunction.Context context, Collector collector) throws Exception {
                synchronized (this) { // Ensure sequential execution
                    System.out.println(rule);
                    String ruleId = rule.getId();
                    String operation = rule.getOperation();
                    String ruleStatement = rule.getRuleExpression();
                    StreamTableEnvironment tableEnv = FlinkTableEnvSingleton.getTableEnv(); // I am saving the TableEnv generated in the main function in a seperate class and accessing it

                    try {
                        switch (operation) {
                            case "CREATE":
                                LOG.info("Creating rule: {}", rule);
                                String createTable = "rule" + ruleId;
                                tableEnv.executeSql(ruleStatement);

                                tableEnv.executeSql("CREATE TABLE rule_sink"+ ruleId+ " ("
                                        + "message STRING "
                                        + ") WITH ("
                                        + "'connector' = 'kafka', "
                                        + "'topic' = 'outputTopic', "
                                        + "'properties.bootstrap.servers' = 'localhost:9092', "
                                        + "'format' = 'raw'"
                                        + ")");

                                tableEnv.executeSql("INSERT INTO rule_sink" + ruleId + " SELECT * FROM " + createTable);
                                collector.collect("Rule data inserted into Kafka topic.");
                                break
                            default:
                                collector.collect("Unknown operation: " + operation);
                        }

                     
                    } catch (Exception e) {
                        collector.collect("Error executing SQL: " + e.getMessage());
                    }
                }
            }

            private transient StreamTableEnvironment tableEnv;

            @Override
            public void open(Configuration parameters) {
                EnvironmentSettings settings = EnvironmentSettings.newInstance()
                        .inStreamingMode()
                        .build();

                tableEnv = StreamTableEnvironment.create(
                        StreamExecutionEnvironment.getExecutionEnvironment(),
                        settings
                );

                tableEnv.executeSql("CREATE CATALOG my_catalog WITH ('type'='generic_in_memory')");
                tableEnv.executeSql("USE CATALOG my_catalog");
            }

        }).setParallelism(10); // Ensure single-threaded executiontry 

Example rule which I passed above is :

{"id": "1", "name": "HighTemperatureRule", "equipmentName": "SensorA", "operation": "CREATE", "ruleExpression": "CREATE TEMPORARY VIEW rule1 AS SELECT * FROM inputTable;"}

My question:
After doing USE CATALOG and etc stuff I was able to get this somehow running. I was able to query multiple SQL statements and verify that they were properly executing, not completely sure how it is working though. Now If I look at the DAG, it always this no matter how many rules I add. I mean how is it running, what is managing its run time the process function handling it?? How much parallelism is a single SQL query assigned?? And if possible can anybody explain the difference between this and normal Flink SQL (with Zepplin or etc) which takes queries via SQL-client. FlinkDAG

It is working but, need explanation as how it is working and its underlying architecture.

apache-flink flink-streaming flink-sql amazon-kinesis-analytics flink-table-api