电脑常识基于SpringBoot使用K stream实现数据埋点\统计\实时更新
今年刚上手一个项目,需求点是文章页面的点赞,收藏,评论和阅读的实时点击统计,并根据这个统计集成给后台,作为后台实现文章日更,时更和用户推送的一个数据埋点.
首先用到的技术是
Kafka流式计算
Kafka除了作为高性能的MQ以外,其实还有一个流式计算的功能,这个功能可以把代码运行中的数据状态以消息的形式发送给统计模块进行分流,聚合和统计输出,自动化完成数据的捕抓和收集;
同时,这种功能需要在项目初期技术选型的时候就确定下来,避免对原代码造成侵入破坏.
MQ采用的是消息队列的方式,所以完全不用担心高并发量的问题,或许唯一使您烦恼的可能是消息生产者和消息消费者之间各种自定的主题名字.
实现思路:
1.数据埋点;
2.Springboot项目整合Kafka
3.Kafka及K Stream原始配置与简化;
4.Kafka Input
5.K Stream流式计算
6.Kafka Output
7.数据收集,处理,更新并返回
具体步骤:
一.基础依赖环境(版本号根据自己项目需求确定)
org.apache.kafkakafka-streams
二.yml中需要配置消息组和服务访问地址(我是用的是Linux系统)
kafka: group: ${spring.application.name} hosts: 192.168.66.133:9092
三.Kafka及K Stream启动配置类----------->
1.KafkaStreamConfig
@Configuration @EnableKafkaStreams //启用kafkastream @ConfigurationProperties(prefix="kafka") public class KafkaStreamConfig { //最大消息的大小 private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024; //kafka所在服务器地址 private String hosts; //kafka所在分组名称 给消费者使用 就是applicationName private String group; public String getHosts() { return hosts; } public void setHosts(String hosts) { this.hosts = hosts; } public String getGroup() { return group; } public void setGroup(String group) { this.group = group; } /** * 重新定义默认的KafkaStreams配置属性,包括: * 1、服务器地址 * 2、应用ID * 3、流消息的副本数等配置 * @return */ @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration defaultKafkaStreamsConfig() { Mapprops = new HashMap<>(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid"); props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid"); props.put(StreamsConfig.RETRIES_CONFIG, 10); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // 消息副本数量 props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1); props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 5_000); props.put(StreamsConfig.SEND_BUFFER_CONFIG, 3*MAX_MESSAGE_SIZE); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, Topology.AutoOffsetReset.EARLIEST.name().toLowerCase()); return new KafkaStreamsConfiguration(props); } }
2.工厂注册类
/** * KafkaStreamListener扫描和实例化成KafkaStreamProcessor.doAction的返回类,完成监听器实际注册的过程 */ @Component public class KafkaStreamListenerFactory implements InitializingBean { Logger logger = LoggerFactory.getLogger(KafkaStreamListenerFactory.class); @Autowired DefaultListableBeanFactory defaultListableBeanFactory;//IOC容器本身 /** * 初始化完成后自动调用 */ @Override public void afterPropertiesSet() { Mapmap = defaultListableBeanFactory.getBeansOfType(KafkaStreamListener.class); for (String key : map.keySet()) { KafkaStreamListener k = map.get(key); KafkaStreamProcessor processor = new KafkaStreamProcessor(defaultListableBeanFactory.getBean(StreamsBuilder.class),k); String beanName = k.getClass().getSimpleName()+"AutoProcessor" ; //将对象交给spring容器管理<bean id="beanName" defaultlistablebeanfactory.registersingleton(beanname,processor.doaction());="" logger.info("add="" kafka="" stream="" auto="" listener="" [{}]",beanname);="" }="" }
3.K Stream监听器接口
/**
* 流数据的监听消费者实现的接口类,系统自动会通过
* KafkaStreamListenerFactory类扫描项目中实现该接口的类
* 并注册为流数据的消费端
*
* 其中泛型可是KStream或KTable
*
* @param*/
public interface KafkaStreamListener{
// 流式处理的时候需要监听的主题是什么 INPUTTOPIC
String listenerTopic();
//流式处理完成之后继续发送到的主题是什么 OUTTOPIC
String sendTopic();
// 流式业务的对象处理逻辑
T getService(T stream);
}
4.基本Bean
/**
* KafkaStream自动处理包装类
*/
public class KafkaStreamProcessor {
// 流构建器
StreamsBuilder streamsBuilder;
private String type;
KafkaStreamListener listener;
public KafkaStreamProcessor(StreamsBuilder streamsBuilder, KafkaStreamListener kafkaStreamListener) {
this.streamsBuilder = streamsBuilder;
this.listener = kafkaStreamListener;
this.parseType();
Assert.notNull(this.type, "Kafka Stream 监听器只支持kstream、ktable,当前类型是" + this.type);
}
/**
* 通过泛型类型自动注册对应类型的流处理器对象
* 支持KStream、KTable
*
* @return
*/
public Object doAction() {
if ("kstream".equals(this.type)) {
KStream<?, ?> stream = streamsBuilder.stream(listener.listenerTopic(), Consumed.with(Topology.AutoOffsetReset.LATEST));
stream = (KStream) listener.getService(stream);
stream.to(listener.sendTopic());
return stream;
} else {
KTable<?, ?> table = streamsBuilder.table(listener.listenerTopic(), Consumed.with(Topology.AutoOffsetReset.LATEST));
table = (KTable) listener.getService(table);
table.toStream().to(listener.sendTopic());
return table;
}
}
/**
* 解析传入listener类的泛型类
*/
private void parseType() {
Type[] types = listener.getClass().getGenericInterfaces();
if (types != null) {
for (int i = 0; i < types.length; i++) {
if (types[i] instanceof ParameterizedType) {
ParameterizedType t = (ParameterizedType) types[i];
String name = t.getActualTypeArguments()[0].getTypeName().toLowerCase();
if (name.contains("org.apache.kafka.streams.kstream.kstream") || name.contains("org.apache.kafka.streams.kstream.ktable")) {
this.type = name.substring(0, name.indexOf('<')).replace("org.apache.kafka.streams.kstream.", "").trim();
break;
}
}
}
}
}
}
5.消息生产者(数据入口)
/**
* 消息生产者
*/
@RestController
@RequestMapping("/sendstream")
public class StreamProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
private static final String INPUT_TOPIC = "input-stream-topic";
@GetMapping
public String sendstream(){
for(int i=1;i<=20;i++){
if(i%2==0){
kafkaTemplate.send(INPUT_TOPIC,"0001"+i,"hello kafka");
}else{
kafkaTemplate.send(INPUT_TOPIC,"0002"+i,"hello stream");
}
}
return "成功";
}
}
6.消息消费者(K Stream数据出口)
/**
* 消息消费者
*/
@Component
public class StreamConsumer {
private static final String OUT_TOPIC = "out-stream-topic";
@KafkaListener(topics =OUT_TOPIC )
public void handleMsg(ConsumerRecordconsumerRecord){
if(consumerRecord!=null){
String key = consumerRecord.key();
String value = consumerRecord.value();
System.out.println("--------"+key+":"+value);
}
}
}
四.数据埋点(简单的数据封装,由生产者发送到消费者)
五.K Stream--->K Table--->K Stream
@Component
public class HotArticleStreamHandler implements KafkaStreamListener<kstream> {
@Override
public String listenerTopic() {//输入主题
return MQConstants.HOT_ARTICLE_INPUT_TOPIC;
}
@Override
public String sendTopic() { //输出主题
return MQConstants.HOT_ARTICLE_OUTPUT_TOPIC;
}
@Override
public KStreamgetService(KStreamstream) {
/**
* 现在的消息格式:
* key value
* "topic" {"data"}
* topic" {"data"}
* .......
*/
KTable<windowed, Long> ktable = stream.flatMapValues(new ValueMapper>() {
@Override
public Iterable<?> apply(String value) {
UpdateArticleMsg articleMsg = JsonUtils.toBean(value, Msg.class);
String tag = Msg;
return Arrays.asList(tag);
}
// 搜集值
}).map(new KeyValueMapper>() {
@Override
public KeyValue<?, ?> apply(String key, Object value) {
return new KeyValue<>(value, value);
}
}).groupByKey()
//设置聚合间隔时间
.windowedBy(TimeWindows.of(Duration.ofSeconds(3)))
.count(Materialized.as("count"));
/**
* 目标输出的格式:
* key value
* value1 500
* value2 300
* value3 4000
*/
KStreamkStream = ktable.toStream().map(new KeyValueMapper<windowed, Long, KeyValue>() {
@Override
public KeyValueapply(Windowed window, Long value) {
String key = window.key().toString();
Msg msg = new Msg();
switch (condition) {
case value1:
streamMsg.setmsg (value1);
break;
case value2:
streamMsg.setmsg (value2);
break;
case value3:
streamMsg.setmsg (value3);
break;
case value4:
streamMsg.setmsg (value4);
break;
}
return new KeyValue<>("", JsonUtils.toString(streamMsg));
}
});
return kStream;
}
}六.数据收集持久化,实时更新功能可选用第三方定时工具定制;
以上就是本篇文章的需求的实现思路和步骤,如果喜欢觉得有用的,请关注一下,下一篇的题材应该是自动化定时工具的介绍.
赞 (0)