• <em id="6vhwh"><rt id="6vhwh"></rt></em>

    <style id="6vhwh"></style>

    <style id="6vhwh"></style>
    1. <style id="6vhwh"></style>
        <sub id="6vhwh"><p id="6vhwh"></p></sub>
        <p id="6vhwh"></p>
          1. 国产亚洲欧洲av综合一区二区三区 ,色爱综合另类图片av,亚洲av免费成人在线,久久热在线视频精品视频,成在人线av无码免费,国产精品一区二区久久毛片,亚洲精品成人片在线观看精品字幕 ,久久亚洲精品成人av秋霞

            hhmm(汗汗嗎漫畫官方在線閱讀頁面免費漫畫入口頁面彈窗漫畫)

            更新時間:2023-03-01 14:57:16 閱讀: 評論:0

            #頭條創(chuàng)作挑戰(zhàn)賽#

            編寫寫入DM層業(yè)務(wù)代碼

            DM層主要是報表數(shù)據(jù),針對實時業(yè)務(wù)將DM層設(shè)置在Clickhou中,在此業(yè)務(wù)中DM層主要存儲的是通過Flink讀取Kafka “KAFKA-DWS-BROWSE-LOG-WIDE-TOPIC” topic中的數(shù)據(jù)進行設(shè)置窗口分析,每隔10s設(shè)置滾動窗口統(tǒng)計該窗口內(nèi)訪問商品及商品一級、二級分類分析結(jié)果,實時寫入到Clickhou中。

            一、代碼編寫

            具體代碼參照“ProcessBrowLogInfoToDM.scala”,大體代碼邏輯如下:

            object ProcessBrowLogInfoToDM { def main(args: Array[String]): Unit = { //1.準(zhǔn)備環(huán)境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val tblEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) env.enableCheckpointing(5000) import org.apache.flink.streaming.api.scala._ /** * 2.創(chuàng)建 Kafka Connector,連接消費Kafka dwd中數(shù)據(jù) * */ tblEnv.executeSql( """ |create table kafka_dws_ur_login_wide_tbl ( | ur_id string, | product_name string, | first_category_name string, | cond_category_name string, | obtain_points string |) with ( | 'connector' = 'kafka', | 'topic' = 'KAFKA-DWS-BROWSE-LOG-WIDE-TOPIC', | 'properties.bootstrap.rvers'='node1:9092,node2:9092,node3:9092', | 'scan.startup.mode'='earliest-offt', --也可以指定 earliest-offt 、latest-offt | 'properties.group.id' = 'my-group-id', | 'format' = 'json' |) """.stripMargin) /** * 3.實時統(tǒng)計每個用戶最近10s瀏覽的商品次數(shù)和商品一級、二級種類次數(shù),存入到Clickhou */ val dwsTbl:Table = tblEnv.sqlQuery( """ | lect ur_id,product_name,first_category_name,cond_category_name from kafka_dws_ur_login_wide_tbl """.stripMargin) //4.將Row 類型數(shù)據(jù)轉(zhuǎn)換成對象類型操作 val browDS: DataStream[BrowLogWideInfo] = tblEnv.toAppendStream[Row](dwsTbl) .map(row => { val ur_id: String = row.getField(0).toString val product_name: String = row.getField(1).toString val first_category_name: String = row.getField(2).toString val cond_category_name: String = row.getField(3).toString BrowLogWideInfo(null, ur_id, null, product_name, null, null, first_category_name, cond_category_name, null) }) val dwsDS: DataStream[ProductVisitInfo] = browDS.keyBy(info => { info.first_category_name + "-" + info.cond_category_name + "-" + info.product_name }) .timeWindow(Time.conds(10)) .process(new ProcessWindowFunction[BrowLogWideInfo, ProductVisitInfo, String, TimeWindow] { override def process(key: String, context: Context, elements: Iterable[BrowLogWideInfo], out: Collector[ProductVisitInfo]): Unit = { val currentDt: String = DateUtil.getDateYYYYMMDD(context.window.getStart.toString) val startTime: String = DateUtil.getDateYYYYMMDDHHMMSS(context.window.getStart.toString) val endTime: String = DateUtil.getDateYYYYMMDDHHMMSS(context.window.getEnd.toString) val arr: Array[String] = key.split("-") val firstCatName: String = arr(0) val condCatName: String = arr(1) val productName: String = arr(2) val cnt: Int = elements.toList.size out.collect(ProductVisitInfo(currentDt, startTime, endTime, firstCatName, condCatName, productName, cnt)) } }) /** * 5.將以上結(jié)果寫入到Clickhou表 dm_product_visit_info 表中 * create table dm_product_visit_info( * current_dt String, * window_start String, * window_end String, * first_cat String, * cond_cat String, * product String, * product_cnt UInt32 * ) engine = MergeTree() order by current_dt * */ //準(zhǔn)備向ClickHou中插入數(shù)據(jù)的sql val inrtIntoCkSql = "inrt into dm_product_visit_info (current_dt,window_start,window_end,first_cat,cond_cat,product,product_cnt) values (?,?,?,?,?,?,?)" val ckSink: SinkFunction[ProductVisitInfo] = MyClickHouUtil.clickhouSink[ProductVisitInfo](inrtIntoCkSql,new JdbcStatementBuilder[ProductVisitInfo] { override def accept(pst: PreparedStatement, productVisitInfo: ProductVisitInfo): Unit = { pst.tString(1,productVisitInfo.currentDt) pst.tString(2,productVisitInfo.windowStart) pst.tString(3,productVisitInfo.windowEnd) pst.tString(4,productVisitInfo.firstCat) pst.tString(5,productVisitInfo.condCat) pst.tString(6,productVisitInfo.product) pst.tLong(7,productVisitInfo.productCnt) } }) //針對數(shù)據(jù)加入sink dwsDS.addSink(ckSink) env.execute() }}二、創(chuàng)建Clickhou-DM層表

            代碼在執(zhí)行之前需要在Clickhou中創(chuàng)建對應(yīng)的DM層商品瀏覽信息表dm_product_visit_info,clickhou建表語句如下:

            #node1節(jié)點啟動clickhou[root@node1 bin]# rvice clickhou-rver start#node1節(jié)點進入clickhou[root@node1 bin]# clickhou-client -m#node1節(jié)點創(chuàng)建clickhou-DM層表create table dm_product_visit_info( current_dt String, window_start String, window_end String, first_cat String, cond_cat String, product String, product_cnt UInt32) engine = MergeTree() order by current_dt;三、代碼測試

            以上代碼編寫完成后,代碼執(zhí)行測試步驟如下:

            1、將代碼中消費Kafka數(shù)據(jù)改成從頭開始消費

            代碼中Kafka Connector中屬性“scan.startup.mode”設(shè)置為“earliest-offt”,從頭開始消費數(shù)據(jù)。

            這里也可以不設(shè)置從頭開始消費Kafka數(shù)據(jù),而是直接啟動向日志采集接口模擬生產(chǎn)日志代碼“RTMockUrLogData.java”,需要啟動日志采集接口及Flume。

            2、執(zhí)行代碼,查看對應(yīng)結(jié)果

            以上代碼執(zhí)行后在,在Clickhou-DM層中表“dm_product_visit_info”中查看對應(yīng)數(shù)據(jù)結(jié)果如下:

            四、架構(gòu)圖

            本文發(fā)布于:2023-02-28 20:04:00,感謝您對本站的認可!

            本文鏈接:http://www.newhan.cn/zhishi/a/167765383675656.html

            版權(quán)聲明:本站內(nèi)容均來自互聯(lián)網(wǎng),僅供演示用,請勿用于商業(yè)和其他非法用途。如果侵犯了您的權(quán)益請與我們聯(lián)系,我們將在24小時內(nèi)刪除。

            本文word下載地址:hhmm(汗汗嗎漫畫官方在線閱讀頁面免費漫畫入口頁面彈窗漫畫).doc

            本文 PDF 下載地址:hhmm(汗汗嗎漫畫官方在線閱讀頁面免費漫畫入口頁面彈窗漫畫).pdf

            標(biāo)簽:漫畫   頁面   在線閱讀   入口   官方
            相關(guān)文章
            留言與評論(共有 0 條評論)
               
            驗證碼:
            Copyright ?2019-2022 Comsenz Inc.Powered by ? 實用文體寫作網(wǎng)旗下知識大全大全欄目是一個全百科類寶庫! 優(yōu)秀范文|法律文書|專利查詢|
            主站蜘蛛池模板: 99久9在线视频 | 传媒| 综合欧美视频一区二区三区| 亚洲成av人片无码迅雷下载| 国产精品亚洲А∨天堂免下载| 狠狠躁夜夜躁人人爽天天bl| 国产免费久久精品44| 亚洲av无码精品蜜桃| av中文一区二区三区| 色噜噜av男人的天堂| 色在线 | 国产| 日韩精品人妻系列无码专区免费| 蜜桃无码一区二区三区| 久久91这里精品国产2020| 成人午夜视频在线| 亚洲色大成网站www看下面| 国产成人精品久久一区二| 免费看亚洲一区二区三区| 国产精品中文字幕视频| 欧美一本大道香蕉综合视频| AV在线亚洲欧洲日产一区二区| 精品国产乱码久久久久夜深人妻| 欧美日本精品一本二本三区| 国产一区二区精品网站看黄| 被绑在坐桩机上抹春药| 久久久综合九色合综| 99精品国产成人一区二区| 国产香蕉久久精品综合网| 国产一区二区精品久久呦| 二区中文字幕在线观看| 一区二区传媒有限公司| 欧美国产精品不卡在线观看| 在线中文字幕精品第5页| 亚洲最大成人一区久久久| 少妇粗大进出白浆嘿嘿视频| 99re6这里有精品热视频| 亚洲综合成人一区二区三区| 亚洲午夜爱爱香蕉片| 国产成人精品无码免费看| 毛片无遮挡高清免费| 巨胸美乳无码人妻视频| 成年片免费观看网站|