#頭條創(chuàng)作挑戰(zhàn)賽#
編寫寫入DM層業(yè)務(wù)代碼DM層主要是報表數(shù)據(jù),針對實時業(yè)務(wù)將DM層設(shè)置在Clickhou中,在此業(yè)務(wù)中DM層主要存儲的是通過Flink讀取Kafka “KAFKA-DWS-BROWSE-LOG-WIDE-TOPIC” topic中的數(shù)據(jù)進行設(shè)置窗口分析,每隔10s設(shè)置滾動窗口統(tǒng)計該窗口內(nèi)訪問商品及商品一級、二級分類分析結(jié)果,實時寫入到Clickhou中。
一、代碼編寫具體代碼參照“ProcessBrowLogInfoToDM.scala”,大體代碼邏輯如下:
object ProcessBrowLogInfoToDM { def main(args: Array[String]): Unit = { //1.準(zhǔn)備環(huán)境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val tblEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) env.enableCheckpointing(5000) import org.apache.flink.streaming.api.scala._ /** * 2.創(chuàng)建 Kafka Connector,連接消費Kafka dwd中數(shù)據(jù) * */ tblEnv.executeSql( """ |create table kafka_dws_ur_login_wide_tbl ( | ur_id string, | product_name string, | first_category_name string, | cond_category_name string, | obtain_points string |) with ( | 'connector' = 'kafka', | 'topic' = 'KAFKA-DWS-BROWSE-LOG-WIDE-TOPIC', | 'properties.bootstrap.rvers'='node1:9092,node2:9092,node3:9092', | 'scan.startup.mode'='earliest-offt', --也可以指定 earliest-offt 、latest-offt | 'properties.group.id' = 'my-group-id', | 'format' = 'json' |) """.stripMargin) /** * 3.實時統(tǒng)計每個用戶最近10s瀏覽的商品次數(shù)和商品一級、二級種類次數(shù),存入到Clickhou */ val dwsTbl:Table = tblEnv.sqlQuery( """ | lect ur_id,product_name,first_category_name,cond_category_name from kafka_dws_ur_login_wide_tbl """.stripMargin) //4.將Row 類型數(shù)據(jù)轉(zhuǎn)換成對象類型操作 val browDS: DataStream[BrowLogWideInfo] = tblEnv.toAppendStream[Row](dwsTbl) .map(row => { val ur_id: String = row.getField(0).toString val product_name: String = row.getField(1).toString val first_category_name: String = row.getField(2).toString val cond_category_name: String = row.getField(3).toString BrowLogWideInfo(null, ur_id, null, product_name, null, null, first_category_name, cond_category_name, null) }) val dwsDS: DataStream[ProductVisitInfo] = browDS.keyBy(info => { info.first_category_name + "-" + info.cond_category_name + "-" + info.product_name }) .timeWindow(Time.conds(10)) .process(new ProcessWindowFunction[BrowLogWideInfo, ProductVisitInfo, String, TimeWindow] { override def process(key: String, context: Context, elements: Iterable[BrowLogWideInfo], out: Collector[ProductVisitInfo]): Unit = { val currentDt: String = DateUtil.getDateYYYYMMDD(context.window.getStart.toString) val startTime: String = DateUtil.getDateYYYYMMDDHHMMSS(context.window.getStart.toString) val endTime: String = DateUtil.getDateYYYYMMDDHHMMSS(context.window.getEnd.toString) val arr: Array[String] = key.split("-") val firstCatName: String = arr(0) val condCatName: String = arr(1) val productName: String = arr(2) val cnt: Int = elements.toList.size out.collect(ProductVisitInfo(currentDt, startTime, endTime, firstCatName, condCatName, productName, cnt)) } }) /** * 5.將以上結(jié)果寫入到Clickhou表 dm_product_visit_info 表中 * create table dm_product_visit_info( * current_dt String, * window_start String, * window_end String, * first_cat String, * cond_cat String, * product String, * product_cnt UInt32 * ) engine = MergeTree() order by current_dt * */ //準(zhǔn)備向ClickHou中插入數(shù)據(jù)的sql val inrtIntoCkSql = "inrt into dm_product_visit_info (current_dt,window_start,window_end,first_cat,cond_cat,product,product_cnt) values (?,?,?,?,?,?,?)" val ckSink: SinkFunction[ProductVisitInfo] = MyClickHouUtil.clickhouSink[ProductVisitInfo](inrtIntoCkSql,new JdbcStatementBuilder[ProductVisitInfo] { override def accept(pst: PreparedStatement, productVisitInfo: ProductVisitInfo): Unit = { pst.tString(1,productVisitInfo.currentDt) pst.tString(2,productVisitInfo.windowStart) pst.tString(3,productVisitInfo.windowEnd) pst.tString(4,productVisitInfo.firstCat) pst.tString(5,productVisitInfo.condCat) pst.tString(6,productVisitInfo.product) pst.tLong(7,productVisitInfo.productCnt) } }) //針對數(shù)據(jù)加入sink dwsDS.addSink(ckSink) env.execute() }}二、創(chuàng)建Clickhou-DM層表
代碼在執(zhí)行之前需要在Clickhou中創(chuàng)建對應(yīng)的DM層商品瀏覽信息表dm_product_visit_info,clickhou建表語句如下:
#node1節(jié)點啟動clickhou[root@node1 bin]# rvice clickhou-rver start#node1節(jié)點進入clickhou[root@node1 bin]# clickhou-client -m#node1節(jié)點創(chuàng)建clickhou-DM層表create table dm_product_visit_info( current_dt String, window_start String, window_end String, first_cat String, cond_cat String, product String, product_cnt UInt32) engine = MergeTree() order by current_dt;三、代碼測試
以上代碼編寫完成后,代碼執(zhí)行測試步驟如下:
1、將代碼中消費Kafka數(shù)據(jù)改成從頭開始消費代碼中Kafka Connector中屬性“scan.startup.mode”設(shè)置為“earliest-offt”,從頭開始消費數(shù)據(jù)。
這里也可以不設(shè)置從頭開始消費Kafka數(shù)據(jù),而是直接啟動向日志采集接口模擬生產(chǎn)日志代碼“RTMockUrLogData.java”,需要啟動日志采集接口及Flume。
2、執(zhí)行代碼,查看對應(yīng)結(jié)果以上代碼執(zhí)行后在,在Clickhou-DM層中表“dm_product_visit_info”中查看對應(yīng)數(shù)據(jù)結(jié)果如下:
四、架構(gòu)圖本文發(fā)布于:2023-02-28 20:04:00,感謝您對本站的認可!
本文鏈接:http://www.newhan.cn/zhishi/a/167765383675656.html
版權(quán)聲明:本站內(nèi)容均來自互聯(lián)網(wǎng),僅供演示用,請勿用于商業(yè)和其他非法用途。如果侵犯了您的權(quán)益請與我們聯(lián)系,我們將在24小時內(nèi)刪除。
本文word下載地址:hhmm(汗汗嗎漫畫官方在線閱讀頁面免費漫畫入口頁面彈窗漫畫).doc
本文 PDF 下載地址:hhmm(汗汗嗎漫畫官方在線閱讀頁面免費漫畫入口頁面彈窗漫畫).pdf
| 留言與評論(共有 0 條評論) |