当前位置: 首页 > news >正文

17网站一起做网店下载怎么创建一个网站

17网站一起做网店下载,怎么创建一个网站,团购网站设计,青岛网页设计师背景 项目中有很多ods层(mysql 通过cannal)kafka,需要对这些ods kakfa做一些etl操作后写入下一层的kafka(dwd层)。 一开始采用的是executeSql方式来执行每个ods→dwd层操作,即类似: def main(…

背景

项目中有很多ods层(mysql 通过cannal)kafka,需要对这些ods kakfa做一些etl操作后写入下一层的kafka(dwd层)。

一开始采用的是executeSql方式来执行每个ods→dwd层操作,即类似:

 def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)val configuration: Configuration = tableEnv.getConfig.getConfigurationtableEnv.createTemporarySystemFunction("etl_handle", classOf[ETLFunction])// source/sink ddltableEnv.executeSql(CREATE_DB_DDL)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE1)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE1)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE2)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE2)....// insert dml,在insert语句中调用etl_handle进行预处理和写入tableEnv.executeSql(INSERT_DWD_TABLE1)tableEnv.executeSql(INSERT_DWD_TABLE2)... 
}

当有多个ods->dwd操作放在同一个flink作业中时,发现这种方式会导致每次insert操作都是单独的DAG,非常消耗资源,特别是这些处理都是比较轻量级的,最好是能融合在同一个DAG中共享资源。

解决方案

查看flink文档:INSERT 语句 | Apache Flink

因此,可以采用statementset的方式,将不同insert sql进行分组执行,每组的insert sql会先被缓存到 StatementSet 中,并在StatementSet.execute() 方法被调用时,同一组的 insert sql(sink) 会被优化成一张DAG共用taskmanager,减少资源浪费,即类似:

def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)val configuration: Configuration = tableEnv.getConfig.getConfigurationtableEnv.createTemporarySystemFunction("etl_handle", classOf[ETLFunction])// source/sink ddltableEnv.executeSql(CREATE_DB_DDL)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE1)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE1)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE2)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE2)....// insert dmltableEnv.createStatementSet().addInsertSql(INSERT_DWD_TABLE1).addInsertSql(INSERT_DWD_TABLE2).addInsertSql(INSERT_DWD_TABLE3).execute()tableEnv.createStatementSet().addInsertSql(INSERT_DWD_TABLE4).addInsertSql(INSERT_DWD_TABLE5).addInsertSql(INSERT_DWD_TABLE6).execute()
}

其他

如果是纯flink sql而不用data stream api,也是可以达到同样的效果的。

http://www.mmbaike.com/news/83667.html

相关文章:

  • 深圳外贸网站优化哪家好香港旺道旺国际集团
  • 微信房地产网站建设郑州seo关键词自然排名工具
  • 家纺网站建设网站排名
  • 当今弹幕网站建设情况在线科技成都网站推广公司
  • 怎样建立平台sem和seo
  • 海南网站建设推广公司深圳网络推广哪家公司好
  • 自适应网站建设百度推广广告公司
  • 网站友情链接怎么样做怎样制作网站
  • wordpress 编辑器 高亮 引用seo优化服务商
  • 政府门户网站制度建设情况广州专做优化的科技公司
  • 义乌网站建设方案详细百度移动
  • 如何为公司做网站seo综合查询怎么关闭
  • 网站开发pythonseo的含义
  • 广告发布属于什么服务seo推广薪资
  • 行业网站建设教程一键制作免费网站的app
  • 北京高端网站建设优势网站怎么提升关键词排名
  • 网站建设学校培训学校百度怎么注册自己的店铺
  • 邹平县建设局网站宁波网站推广营销
  • 食品网站建设的照片网站怎么推广效果好一点呢
  • 房山手机网站建设河南网站建设
  • 云主机建站免费技能培训网
  • 公司简介50字附子seo教程
  • 无锡网站推企业宣传推广怎么做
  • 网站开发前端课程图片搜索图片识别
  • 科技网站设计公司关键词优化搜索排名
  • 银行党风廉政建设考试网站郑州seo网络营销
  • 网站开发的标准流程小程序推广平台
  • 制作网站软件网页设计代做
  • 网站建设方案书组网方案河南品牌网络推广外包
  • 最好的微网站建设价格企业管理培训班哪个好