跳到主要内容

Kafka信道导入

业务场景

  1. 已有Kafka数据源,并完成了数据源注册,可快速创建topic和信道作业,完成消费;
  2. 暂无Kafka数据源,可基于平台的Kafka连接信息编写SDK等代码,再进行topic和信道作业创建,完成消费。

前提条件

  • 平台部署完成,产品规格包含实时数据模块,且服务正常
  • 用户拥有实时数据模块权限

使用限制

  • 目标表未识别到主键时仅支持新增插入数据

操作流程

场景一:“已有Kafka数据源”使用流程

  • 第一步:注册Kafka数据源
    进入资源管理 -》数据源管理 ,点击新增kafka数据源,完成配置,并测试连通性通过。
    最佳实践

  • 第二步:创建topic
    进入数据开发 > 实时数据 > 数据信道管理页面,完成“新建Topic”,请根据实际情况选择使用单消息类型还是多消息类型。
    此步骤的重点在于:要选择已注册的Kafka数据源,并按对应数据源中实际已存在的topic格式进行消息类型的字段创建。
    数据信道管理

  • 第三步:创建数据信道作业
    进入数据开发 > 实时数据 > 数据信道作业页面的“新建信道数据作业”按钮,选择已创建的topic,完成页面内容填写信息后保存,再操作栏点击作业【运行】即可实时开始信道数据的消费。
    数据信道作业
    数据信道作业

第四步:作业监控与运维
1)选择数据开发 > 实时数据 > 数据信道作业,在作业列表中选择所关注的作业,在操作列点击“运行详情”可查看作业运行信息,包括通过来源与目标表数据量,以大致了解数据同步情况,并提供运行日志下载。 最佳实践

2)告警通知:若需监控作业状态,特别是运行失败,可在个人中心 > 消息通知中配置告警,支持邮件、钉钉群通知。对象类型选择“实时作业”、对象名称填写需监控的作业名称,通知类型选择“作业运行”,作业运行失败时,可发送消息通知。 最佳实践

场景二:“暂无Kafka数据源”使用流程

  • 第一步:获取Kafka连接信息
    进入数据开发 -》实时数据-》数据信道管理-》连接信息,此为平台内部Kafka的连接信息,手动配置并保存密码,将生成SASL账户,默认支持SASL,加密方式为 SCRAM-SHA-256。 数据信道管理
  • 第二步:基于连接信息编写SDK等代码
    用户需通过获取到的Kafka连接信息,自行编写如SDK代码,实现将数据写入平台的Kafka中,代码示例如下:
package com.hexadb.luban.service.system.config;

import lombok.Data;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProduceDemo {
public static void main(String[] args) {
// SASL机制
testKafkaScram();
// NO SASL机制
testKafkaNoSASL();
}

public static void testKafkaScram() {
// 服务地址
String bootstrapServers = "172.17.0.112:32702";
// 加密方式
String mechanism = "SCRAM-SHA-256";
// 用户名
String username = "test";
// 密码
String password = "test";
// 生产者配置
Properties producerProps = new Properties();
producerProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put("security.protocol", "SASL_PLAINTEXT");
producerProps.put("sasl.mechanism", mechanism);
SASLMechanismEnum mechanismEnum = SASLMechanismEnum.getByName(mechanism);
producerProps.put("sasl.jaas.config", mechanismEnum.getConfig(username, password));

// 构建生产客户端
KafkaProducer producer = new KafkaProducer(producerProps);
KafkaData data = getData();
ProducerRecord<String, String> record = new ProducerRecord<>(data.getTopic(), data.getKey(), data.getValue());
try {
// 同步发送消息
for (int i = 0; i < 4; i++) {
producer.send(record).get();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

public static void testKafkaNoSASL() {
// 服务地址
String bootstrapServers = "172.17.0.112:32702";
// 生产者配置
Properties producerProps = new Properties();
producerProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 构建生产客户端
KafkaProducer producer = new KafkaProducer(producerProps);
KafkaData data = getData();
ProducerRecord<String, String> record = new ProducerRecord<>(data.getTopic(), data.getKey(), data.getValue());
try {
// 同步发送消息
for (int i = 0; i < 4; i++) {
producer.send(record).get();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

private static KafkaData getData() {
// topic
String topic = "test_topic";
// 分区标识,可以是时间戳,也可以是业务主键
String key = System.currentTimeMillis() + "";
// 消息内容 json 数据格式
String value = "{\n" +
"\t\"name\":\"hei\",\n" +
"\t\"age\":15,\n"
+ "\t\"bear_key\":\"bear_1\"\n"
+ "}";
return new KafkaData(topic, key, value);
}

@Data
public static class KafkaData {
String topic;
String key;
String value;

public KafkaData(String topic, String key, String value) {
this.topic = topic;
this.key = key;
this.value = value;
}
}

public enum SASLMechanismEnum {
SCRAM_SHA_256("SCRAM-SHA-256", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"),
SCRAM_SHA_512("SCRAM-SHA-512", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"),
PLAIN("PLAIN", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";");

private String configFormat;
private String mechanism;

private SASLMechanismEnum(String mechanism, String configFormat) {
this.mechanism = mechanism;
this.configFormat = configFormat;
}

public String getMechanism() {
return this.mechanism;
}

public static SASLMechanismEnum getByName(String name) {
return Arrays.stream(values()).filter((s) -> {
return s.getMechanism().equals(name);
}).findFirst().orElse(null);
}

public String getConfig(String username, String password) {
return String.format(this.configFormat, username, password);
}
}

}
  • 第三步:实时创建topic
    进入数据开发 > 实时数据 > 数据信道管理页面,完成“新建Topic”,请根据实际情况选择使用单消息类型还是多消息类型,更多细节配置可查看信道配置
    • 此步骤的重点在于:无需选择Kafka数据源,可根据实际需要进行消息类型的字段创建,保存后系统会实时在平台的Kafka中创建对应topic。

最佳实践

  • 第四步:创建数据信道作业
    进入数据开发 > 实时数据 > 数据信道作业页面完成数据信道作业的创建,同“场景一”的最后一步基本一致,不再赘述。

数据信道作业

第五步:作业监控与运维
1)选择数据开发 > 实时数据 > 数据信道作业,在作业列表中选择所关注的作业,在操作列点击“运行详情”可查看作业运行信息,包括通过来源与目标表数据量,以大致了解数据同步情况,并提供运行日志下载。 最佳实践

2)告警通知:若需监控作业状态,特别是运行失败,可在个人中心 > 消息通知中配置告警,支持邮件、钉钉群通知。对象类型选择“实时作业”、对象名称填写需监控的作业名称,通知类型选择“作业运行”,作业运行失败时,可发送消息通知。 最佳实践