当前位置: 首页 > news >正文

西安第二波疫情最新消息短视频seo询盘系统

西安第二波疫情最新消息,短视频seo询盘系统,宜兴网站建设,怎么做短文网站在《0基础学习PyFlink——使用PyFlink的SQL进行字数统计》一文中,我们直接执行了Select查询操作,在终端中直接看到了查询结果。 select word, count(1) as count from source group by word; ------------------------------------------------------ |…

在《0基础学习PyFlink——使用PyFlink的SQL进行字数统计》一文中,我们直接执行了Select查询操作,在终端中直接看到了查询结果。

select word, count(1) as `count` from source group by word;
+--------------------------------+----------------------+
|                           word |                count |
+--------------------------------+----------------------+
|                              A |                    3 |
|                              B |                    1 |
|                              C |                    2 |
|                              D |                    2 |
|                              E |                    1 |
+--------------------------------+----------------------+

在生产环境,我们往往要将计算结果保存到外部系统中,比如Mysql等。这个时候我们就要使用Sink。

Sink

Sink用于将Reduce结果输出到外部系统。它也是通过一个表(Table)来表示结构。这个和MapReduce思路中的Map很类似。

Print

为了简单起见,我们让Sink的表连接的外部系统是print。这样我们就可以在控制台上看到数据。

    # define the sinkmy_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT) WITH ('connector' = 'print');"""t_env.execute_sql(my_sink_ddl).print()

需要强调的是,我们没有给sink的表创建主键。这个会在后面文章中作为一个对比案例进行分析。
这一步只能创建表和连接器,具体执行还要执行下一步。

Execute

因为source和WordsCountTableSink是两张表,分别表示数据的输入和输出结构。如果要打通输入和输出,则需要将source表中的数据通过某些计算,插入到WordsCountTableSink表中。于是我们主要使用的是insert into指令。

    # execute insertmy_select_ddl = """insert into WordsCountTableSinkselect word, count(1) as `count`from sourcegroup by word"""t_env.execute_sql(my_select_ddl).wait()

完整代码如下

import argparse
import logging
import sysfrom pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment)def word_count(input_path):config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)# define the sourcemy_source_ddl = """create table source (word STRING) with ('connector' = 'filesystem','format' = 'csv','path' = '{}')""".format(input_path)t_env.execute_sql(my_source_ddl).print()tab = t_env.from_path('source')# define the sinkmy_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT) WITH ('connector' = 'print');"""t_env.execute_sql(my_sink_ddl).print()# execute insertmy_select_ddl = """insert into WordsCountTableSinkselect word, count(1) as `count`from sourcegroup by word"""t_env.execute_sql(my_select_ddl).wait()if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input)

执行命令如下

python sql_print.py --input input1.csv

输出结果如下

Using Any for unsupported type: typing.Sequence[~T]
No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform CANNOT be used with method=DIRECT_READ.
OK
OK
+I[A, 3]
+I[B, 1]
+I[C, 2]
+I[D, 2]
+I[E, 1]

因为使用的是批处理模式(in_batch_mode),我们看到Flink将所有数据计算完整成,成批的执行了新增操作(+代表新增)。这块对比我们将在后续将流处理时介绍区别。
附上input1.csv内容

"A",
"B",
"C",
"D",
"A",
"E",
"C",
"D",
"A",
http://www.mmbaike.com/news/99143.html

相关文章:

  • 网站建设性能分析电销系统
  • 大网站制作模板建站的网站
  • 招聘网站做销售怎么样互联网营销师培训机构哪家好
  • 手机绘图设计免费软件北京seo助理
  • 政府网站云平台建设seo实战培训视频
  • 手机可以搭建网站么网站seo外包
  • 杭州模板建站代理2022真实新闻作文400字
  • 单位做网站的目的企业品牌策划
  • 公司做网站注意什么自己可以做网站吗
  • 用jsp做网站的感想重庆网站优化
  • 电子商务前景如何北京网络优化
  • 哪里找人做网站app推广注册放单平台
  • b2c网站开发目的和意义百度指数搜索榜度指数
  • 做网站素材图片百度推广费用可以退吗
  • 有没有做网站源代码 修改的免费引流在线推广
  • 用c语言可以做网站吗百度网站优化公司
  • 网站5建设需要学什么时候开始无限制访问国外的浏览器
  • 网站建设二次开发怎么样网络公关
  • 怎么做公司网站需要什么网站ip查询
  • 做企业营销网站网站制作公司有哪些
  • 云南网站开发软件站长工具域名
  • 动漫制作专业软件有哪些新区快速seo排名
  • 网站建设招聘内容精准营销的典型案例
  • 哪家网站遴选做的比较好游戏推广员是做什么的
  • 网站建设优化文章旺道seo
  • 公安机关网站备案流程图谷歌搜索引擎镜像
  • 个人网站必须备案吗最新军事新闻事件今天
  • 网站快照怎么更新推广平台 赚佣金
  • 如何引导企业老板做网站seo优化排名服务
  • 如何做网站授权网址广州网站seo公司