重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
这篇文章主要讲解了“1、如何用flink的table和sql构建pom文件”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“1、如何用flink的table和sql构建pom文件”吧!
成都创新互联公司服务项目包括江干网站建设、江干网站制作、江干网页制作以及江干网络营销策划等。多年来,我们专注于互联网行业,利用自身积累的技术优势、行业经验、深度合作伙伴关系等,向广大中小型企业、政府机构等提供互联网行业的解决方案,江干网站推广取得了明显的社会效益与经济效益。目前,我们服务的客户以成都为中心已经辐射到江干省份的部分城市,未来相信会继续扩大服务区域并继续获得客户的支持与信任!
构建pom文件
4.0.0 org.example flinksqldemo 1.0-SNAPSHOT UTF-8 UTF-8 2.11 2.11.8 0.10.2.1 1.12.0 2.7.3 compile org.apache.maven.plugins maven-compiler-plugin 8 org.apache.flink flink-table-planner-blink_2.11 1.12.0 org.apache.flink flink-java ${flink.version} ${setting.scope} org.apache.flink flink-streaming-java_2.11 ${flink.version} ${setting.scope} org.apache.flink flink-clients_2.11 ${flink.version} ${setting.scope} org.apache.flink flink-connector-kafka-0.10_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} ${setting.scope} org.apache.flink flink-connector-filesystem_${scala.binary.version} ${flink.version} org.apache.kafka kafka_${scala.binary.version} ${kafka.version} ${setting.scope} org.apache.hadoop hadoop-common ${hadoop.version} ${setting.scope} org.apache.hadoop hadoop-hdfs ${hadoop.version} ${setting.scope} org.apache.hadoop hadoop-client ${hadoop.version} ${setting.scope} org.slf4j slf4j-api 1.7.25 com.alibaba fastjson 1.2.72 redis.clients jedis 2.7.3 com.google.guava guava 29.0-jre
2、编写代码
package com.jd.data; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class test { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSourcestream = env.readTextFile("/Users/liuhaijing/Desktop/flinktestword/aaa.txt"); // DataStreamSource stream = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator map = stream.map(new MapFunction () { public SensorReading map(String s) throws Exception { String[] split = s.split(","); return new SensorReading(split[0], split[1], split[2]); } }); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 使用 table api // Table table = tableEnv.fromDataStream(map); // table.printSchema(); // Table select = table.select("a,b"); // 使用 sql api tableEnv.createTemporaryView("test", map); Table select = tableEnv.sqlQuery(" select a, b from test"); DataStream sensorReading2DataStream = tableEnv.toAppendStream(select, SensorReading2.class); sensorReading2DataStream.map(new MapFunction () { @Override public Object map(SensorReading2 value) throws Exception { System.out.println(value.a+" "+ value.b); return null; } }); env.execute(); } }
package com.jd.data; public class SensorReading { public String a; public String b; public String c; public SensorReading(){ } public SensorReading(String a, String b, String c) { this.a = a; this.b = b; this.c = c; } public String getA() { return a; } public void setA(String a) { this.a = a; } public String getB() { return b; } public void setB(String b) { this.b = b; } public String getC() { return c; } public void setC(String c) { this.c = c; } }
package com.jd.data; public class SensorReading2 { public String a; public String b; public SensorReading2(){ } public SensorReading2(String a, String b) { this.a = a; this.b = b; } public String getA() { return a; } public void setA(String a) { this.a = a; } public String getB() { return b; } public void setB(String b) { this.b = b; } }
注意:pojo 中属性必须是public的, 包含无参构造器
感谢各位的阅读,以上就是“1、如何用flink的table和sql构建pom文件”的内容了,经过本文的学习后,相信大家对1、如何用flink的table和sql构建pom文件这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!