I am trying to build a Flink application that runs rules dynamically. I have a rule Stream from where SQL rules are written, which Flink reads from and executes. I have connected the rules stream to a process, and inside the process, I was somehow able to get the tableEnv and trigger the sqlExecute.
ruleStream.process(new ProcessFunction() {
@Override
public void processElement(Rule rule, ProcessFunction.Context context, Collector collector) throws Exception {
synchronized (this) { // Ensure sequential execution
System.out.println(rule);
String ruleId = rule.getId();
String operation = rule.getOperation();
String ruleStatement = rule.getRuleExpression();
StreamTableEnvironment tableEnv = FlinkTableEnvSingleton.getTableEnv(); // I am saving the TableEnv generated in the main function in a seperate class and accessing it
try {
switch (operation) {
case "CREATE":
LOG.info("Creating rule: {}", rule);
String createTable = "rule" + ruleId;
tableEnv.executeSql(ruleStatement);
tableEnv.executeSql("CREATE TABLE rule_sink"+ ruleId+ " ("
+ "message STRING "
+ ") WITH ("
+ "'connector' = 'kafka', "
+ "'topic' = 'outputTopic', "
+ "'properties.bootstrap.servers' = 'localhost:9092', "
+ "'format' = 'raw'"
+ ")");
tableEnv.executeSql("INSERT INTO rule_sink" + ruleId + " SELECT * FROM " + createTable);
collector.collect("Rule data inserted into Kafka topic.");
break
default:
collector.collect("Unknown operation: " + operation);
}
} catch (Exception e) {
collector.collect("Error executing SQL: " + e.getMessage());
}
}
}
private transient StreamTableEnvironment tableEnv;
@Override
public void open(Configuration parameters) {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
tableEnv = StreamTableEnvironment.create(
StreamExecutionEnvironment.getExecutionEnvironment(),
settings
);
tableEnv.executeSql("CREATE CATALOG my_catalog WITH ('type'='generic_in_memory')");
tableEnv.executeSql("USE CATALOG my_catalog");
}
}).setParallelism(10); // Ensure single-threaded executiontry
Example rule which I passed above is :
{"id": "1", "name": "HighTemperatureRule", "equipmentName": "SensorA", "operation": "CREATE", "ruleExpression": "CREATE TEMPORARY VIEW rule1 AS SELECT * FROM inputTable;"}
My question:
After doing USE CATALOG and etc stuff I was able to get this somehow running. I was able to query multiple SQL statements and verify that they were properly executing, not completely sure how it is working though. Now If I look at the DAG, it always this no matter how many rules I add. I mean how is it running, what is managing its run time the process function handling it?? How much parallelism is a single SQL query assigned?? And if possible can anybody explain the difference between this and normal Flink SQL (with Zepplin or etc) which takes queries via SQL-client. FlinkDAG
It is working but, need explanation as how it is working and its underlying architecture.