Json2StarRocks 实现增删改

背景

需要利用StarRocks1.19.0以及之后版本的Primary Key模型

本文阐述json数据导入StarRocks,实现insert、update、delete

操作

StarRocks DDL

CREATE TABLE cdc_db.`cdc_table` (
  `pk` int(20) NOT NULL COMMENT "",
  `col0` varchar(64) REPLACE NULL COMMENT ""
) ENGINE=OLAP 
PRIMARY KEY(`pk`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`pk`) BUCKETS 1 
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);

Stream load

json数据

[
        {"pk":1,"col0":"aaca"},
        {"pk":2,"col0":"aaba"},
        {"pk":3,"col0":"aa4a"},
        {"pk":4,"col0":"aa33"},
        {"pk":5,"col0":"aaa3"}
]

Upsert

curl -v --location-trusted -u root: -H "format: json" -H "jsonpaths: [\"$.pk\", \"$.col0\"]" -H "strip_outer_array: true" -T json.text http://172.26.194.184:9011/api/cdc_db/cdc_table/_stream_load

Delete

curl -v --location-trusted -u root: -H "format: json" -H "jsonpaths: [\"$.pk\", \"$.col0\"]" -H "strip_outer_array: true" -H "columns:__op='delete'" -T json.text http://172.26.194.184:9011/api/cdc_db/cdc_table/_stream_load

Routine load

Kakfa数据中有标识:upsert 0 或delete 1

{"pk":1,"col0":"aaca","op_type":0}
{"pk":2,"col0":"aaba","op_type":0}
{"pk":3,"col0":"aa4a","op_type":0}
{"pk":1,"col0":"aaca","op_type":1}
{"pk":3,"col0":"aa4a","op_type":1}
CREATE ROUTINE LOAD cdc_db.cdc_task1 ON cdc_table
COLUMNS(pk,col0,__op)
PROPERTIES
(
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "strict_mode" = "false",
    "format" = "json",
    "jsonpaths" = "[\"$.data.pk\",\"$.data.col0\",\"$.op_type\"]"
)
FROM KAFKA
(
    "kafka_broker_list" = "172.26.92.141:9092",
    "kafka_topic" = "cdc-data",
    "property.group.id" = "starrocks-group",
    "property.client.id" = "starrocks-client",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

Mysql binlog

当前不支持data中为多个数组的形式

{"data":[{"col1":12,"col2":"adag"}],"database":"mysql_source","es":1634222249000,"id":644,"isDdl":false,"mysqlType":{"col1":"int(20)","col0":"varchar(64)"},"old":[{"col0":"dagdg"}],"pkNames":["host"],"sql":"","sqlType":{"host":12,"collect_time":-5},"table":"query_collect_time","ts":1634222249860,"type":"UPDATE"}
CREATE ROUTINE LOAD cdc_db.task2 ON cdc_table
COLUMNS(pk,col0,temp,__op=(CASE temp WHEN "DELETE" THEN 1 ELSE 0 END))
PROPERTIES
(
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "strict_mode" = "false",
    "format" = "json",
    "jsonpaths" = "[\"$.data[0].col1\",\"$.data[0].col2\",\"$.type\"]"
)
FROM KAFKA
(
    "kafka_broker_list" = "172.26.92.141:9092",
    "kafka_topic" = "mysql-data",
    "property.group.id" = "starrocks-group",
    "property.client.id" = "starrocks-client",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

Oracle log

注意:当前这种模式没办法支持delete

标准格式

CREATE ROUTINE LOAD cdc_db.task3 ON cdc_table
COLUMNS(pk,col0,temp,__op=(CASE temp WHEN "D" THEN 1 ELSE 0 END))
PROPERTIES
(
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "strict_mode" = "false",
    "format" = "json",
    "jsonpaths" = "[\"$.after.ID\",\"$.after.NAME\",\"$.op_type\"]"
)
FROM KAFKA
(
    "kafka_broker_list" = "172.26.92.141:9092",
    "kafka_topic" = "test_ogg",
    "property.group.id" = "starrocks-group",
    "property.client.id" = "starrocks-client",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
1赞

你好,当前只支持data中是单个数组格式的,多个数组暂时不支持,可以用flink-cdc直接同步mysql到starrocks

请发下是哪个demo呢

请问下您目前使用的是哪个版本呢?