1.版本信息:
Flink Version:1.6.2
Kafka Version:0.9.0.0
MySQL Version:5.6.21
2.Kafka 消息样例及格式:[IP TIME URL STATU_CODE REFERER]
1 | 1.74.103.143 2018-12-20 18:12:00 "GET /class/130.html HTTP/1.1" 404 https://search.yahoo.com/search?p=Flink实战 |
3.工程pom.xml
1 | <scala.version>2.11.8</scala.version> |
4.sConf类 定义与MySQL连接的JDBC的参数
1 | package com.soul.conf; |
5.MySQLSlink类
1 | package com.soul.kafka; |
6.数据清洗日期工具类
1 | package com.soul.utils; |
7.MySQL建表
1 | create table log_info( |
8.主程序:
主要是将time的格式转成yyyyMMddHHmmss,
还有取URL中的课程ID,将不是/class开头的过滤掉。
1 | package com.soul.kafka; |
9.启动主程序,查看MySQL表数据在递增
1 | mysql> select count(*) from log_info; |
Kafka过来的消息是我模拟的,一分钟产生100条。
以上是我司生产项目代码的抽取出来的案例代码V1。稍后还有WaterMark之类会做分享。