按下面流程执行,默认服务对象是 bethune 仓库中的 Kafka 日志监控任务。
先确认用户给了哪些输入。若信息不全,只问最小必需项:
若用户已经给出“按任务34类似模式”,默认理解为:
parseAndFormatLogTime()、safe()、Hive 分区补齐、4 份 config 同步更新的处理方式
若任务更接近 35 或 36 这类列表展开模式,按列表展开规则处理。详细模式见 references/bethune-patterns.md。
MessageModel、Po、config.properties 键位,确认命名和字段顺序。
MessageModel、Po、Job。
src/main/resources/config.properties
src/main/resources/dev/config.properties
src/main/resources/product/config.properties
src/main/resources/stage/config.properties
mvn -DskipTests compile 验证新增任务。
TableSchema、StarRocksSinkRowBuilder、toHiveRow() 三处字段顺序完全一致。
st 永远放在输出首列;Hive 行末尾永远追加 year、month、day。
module 过滤值写死在 Job 类常量里,不写入配置。
logTime 统一走 A 方案:为空或解析失败都记录错误日志并丢弃。
message 为空直接丢弃。
id 优先取 skyNetVo.getId(),为空时生成 UUID。
cnt 通常固定为 1。
safe() 兜底,数值字段保留原始数值类型。
null 或空集合时,整条消息直接丢弃。
MessageModel:src/main/java/com/ly/tms/po/carSupply/SkynetLog{BizName}MessageModel.java
Po:src/main/java/com/ly/tms/po/carSupply/SkynetLog{BizName}Po.java
Job:src/main/java/com/ly/tms/job/Bus_{BizName}_KafkaToStarRock_{任务编号}.java
配置键遵循 bethune 现有分组:
kafka.bus.{biz}.topic
travel.car.{biz}.group
starrocks.fe.travel.common.{tableKey}
hive.hive_train_ops.{tableKey}
datas、fullPriceList 这类列表:按 35/36 模式在 flatMap() 中逐项展开。
MessageModel 上补 @JSONField(name = "...")。
完成后至少说明:
读取 references/bethune-patterns.md 获取以下内容:
parseAndFormatLogTime() 与 toHiveRow() 的固定模板
cnt INT,
traceid STRING,
{其余字段按 PO 顺序}
)
PARTITIONED BY (year STRING, month STRING, day STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
STORED AS TEXTFILE;
### StarRocks
CREATE TABLE TCTravelStreamData_db.{表名} (
st DATETIME,
apmtraceid VARCHAR(256),
id VARCHAR(256),
cnt INT,
traceid VARCHAR(256),
{其余字段:STRING→VARCHAR(512), INT→INT, DOUBLE→DOUBLE}
)
DUPLICATE KEY(st, apmtraceid)
DISTRIBUTED BY HASH(id) BUCKETS 8
PROPERTIES ("replication_num" = "3");
---
## 参考示例(已实现任务)
| 任务 | 类名 | Topic | Module | List展开字段 | SR表名 |
|------|------|-------|--------|------------|--------|
| 33 | Bus_Search_Abtest_KafkaToStarRock_33 | skynet_log_Public_SFC_ABTest_Monitor | BUS_Public_SFC_ABTest_Monitor | 无 | bus_sfc_abtest_monitor |
| 34 | Bus_Search_ReplacePrice_KafkaToStarRock_34 | skynet_log_Public_SFC_Replace_Price_Monitor | BUS_Public_SFC_Replace_Price_Monitor | 无(ReferPriceBean嵌套) | bus_sfc_replace_price_monitor |
| 35 | Bus_Carpool_CalEnter_KafkaToStarRock_35 | skynet_log_3304590_CallEnter | BUS_PUBLIC_CARPOOL_PRICING_CallEnter | fullPriceList | bus_carpool_calenter_monitor |
| 36 | Bus_Metric_Collection_KafkaToStarRock_36 | skynet_log_3309435_bus_travelmetrics | BUS_METRIC_COLLECTION | datas | bus_metric_collection_monitor |
共 1 个版本