重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
datastream是flink提供给用户使用的用于进行流计算和批处理的api,是对底层流式计算模型的api封装,便于用户编程。
成都创新互联公司长期为超过千家客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为吴兴企业提供专业的做网站、网站建设,吴兴网站改版等技术服务。拥有十年丰富建站经验和众多成功案例,为您定制开发。
一个完整的datastream运行模型一般由三部分组成,分别为Source、Transformation、Sink。DataSource主要负责数据的读取(也就是从数据源读取,可以批数据源,也可以是流式数据数据源),Transformation主要负责对属于的转换操作(也就是正常的业务处逻辑),Sink负责最终数据的输出(计算结果的导出)。
一般来说,使用datastream api编写flink程序,包括以下流程:
1、获得一个执行环境;(Execution Environment)
2、加载/创建初始数据;(Source)
3、指定转换这些数据;(Transformation)
4、指定放置计算结果的位置;(Sink)
5、触发程序执行(这是流式计算必须的操作,如果是批处理则不需要)
4.0.0
SparkDemo
SparkDemoTest
1.0-SNAPSHOT
UTF-8
2.11.8
2.7.3
2.11
1.6.1
org.apache.hadoop
hadoop-client
${hadoop.version}
MySQL
mysql-connector-java
8.0.12
junit
junit
4.12
org.apache.logging.log4j
log4j-core
2.9.0
org.apache.flink
flink-java
1.6.1
org.apache.flink
flink-streaming-java_2.11
1.6.1
org.apache.flink
flink-streaming-scala_2.11
1.6.1
org.apache.flink
flink-scala_2.11
1.6.1
org.apache.flink
flink-clients_2.11
1.6.1
org.apache.flink
flink-table_2.11
1.6.1
provided
org.apache.hadoop
hadoop-client
${hadoop.version}
com.alibaba
fastjson
1.2.22
org.apache.flink
flink-connector-kafka-0.10_${scala.binary.version}
${flink.version}
org.scala-tools
maven-scala-plugin
2.15.2
compile
testCompile
maven-compiler-plugin
3.6.0
1.8
org.apache.maven.plugins
maven-surefire-plugin
2.19
true
有三种类型的执行环境:
1、StreamExecutionEnvironment.getExecutionEnvironment()
创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
2、StreamExecutionEnvironment.createLocalEnvironment()
返回本地执行环境,需要在调用时指定默认的并行度。
3、StreamExecutionEnvironment.createRemoteEnvironment()
返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。
1、env.readTextFile(path)
一列一列的读取遵循TextInputFormat规范的文本文件,并将结果作为String返回。
package flinktest;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ExampleDemo {
public static void main(String[] args) throws Exception {
//1、创建环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、读取文件作为数据源
DataStreamSource fileSource = env.readTextFile("/tmp/test.txt");
//3、打印数据
fileSource.print();
//4、启动任务执行
env.execute("test file source");
}
}
2、env.readFile(fileInputFormat,path)
按照指定的fileinputformat格式来读取文件。这里的fileinputformat可以自定义类
package flinktest;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ExampleDemo {
public static void main(String[] args) throws Exception {
//1、创建环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、读取文件作为数据源
DataStreamSource fileSource = env.readFile(new TextInputFormat(new Path("/tmp/test.txt")), "/tmp/test.txt");
//3、打印数据
fileSource.print();
//4、启动任务执行
env.execute("test file source");
}
}
socketTextStream(host,port)
package flinktest;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ExampleDemo {
public static void main(String[] args) throws Exception {
//1、创建环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、读取socket作为数据源
DataStreamSource sourceSocket = env.socketTextStream("127.0.0.1", 1000);
//3、打印数据
sourceSocket.print();
//4、启动任务执行
env.execute("test socket source");
}
}
1、fromCollection(Collection)
从集合中创建一个数据流,集合中所有元素的类型是一致的。
List list = new ArrayList<>();
DataStreamSource sourceCollection = env.fromCollection(list);
2、fromCollection(Iterator)
从迭代(Iterator)中创建一个数据流,指定元素数据类型的类由iterator返回。
3、fromElements(Object)
从一个给定的对象序列中创建一个数据流,所有的对象必须是相同类型的
4、generateSequence(from, to)
从给定的间隔中并行地产生一个数字序列。读取一定范围的sequnce对象
env.addSource(SourceFuntion)
自定义一个数据源实现类,然后 addSource 到到env中。比如场景的从kafka读取数据,从mysql读取数据
Data Sink 消费DataStream中的数据,并将它们转发到文件、套接字、外部系统或者打印出。Flink有许多封装在DataStream操作里的内置输出格式。
1、 writeAsText
将元素以字符串形式逐行写入(TextOutputFormat),这些字符串通过调用每个元素的toString()方法来获取。
2、WriteAsCsv
将元组以逗号分隔写入文件中(CsvOutputFormat),行及字段之间的分隔是可配置的。每个字段的值来自对象的toString()方法。
3、print/printToErr
打印每个元素的toString()方法的值到标准输出或者标准错误输出流中。或者也可以在输出流中添加一个前缀,这个可以帮助区分不同的打印调用,如果并行度大于1,那么输出也会有一个标识由哪个任务产生的标志。
4、 writeUsingOutputFormat
自定义文件输出的方法和基类(FileOutputFormat),支持自定义对象到字节的转换。
5、writeToSocket
根据SerializationSchema 将元素写入到socket中。
6、stream.addSink(SinkFunction)
使用自定义的sink类
DataStream → DataStream:输入一个参数经过处理产生一个新的参数
DataStream dataStream = //...
dataStream.map(new MapFunction() {
@Override
//这里将每个参数 * 2,然后返回
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
DataStream → DataStream:输入一个参数,产生0个、1个或者多个输出。
dataStream.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out)
throws Exception {
//切割字符串,将处理之后的数据放到 collector 中。
for(String word: value.split(" ")){
out.collect(word);
}
}
});
DataStream → DataStream:计算每个元素的布尔值,并返回布尔值为true的元素。下面这个例子是过滤出非0的元素:
dataStream.filter(new FilterFunction() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});
DataStream → KeyedStream:要求输入是tuple,或者是一个复合对象,里面有多个属性(例如student类,里面有name、age等2个以上的属性),反正就是必须有作为key和value的数据。根据key进行分区,相同key的在同一个分区,在内部使用hash实现。
//有不同方式指定key
dataStream.keyBy("someKey") // 指定key的字段名称,常用于复合对象中
dataStream.keyBy(0) // 指定key的位置,常用于tuple中
KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果,也就是每一次聚合的结果都会返回,直到最后一次聚合结束,所以不是只返回最后一个聚合结果。
keyedStream.reduce(new ReduceFunction() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});
KeyedStream → DataStream
一个有初始值的分组数据流的滚动折叠操作,合并当前元素和前一次折叠操作的结果,并产生一个新的值,返回的流中包含每一次折叠的结果,而不是只返回最后一次折叠的最终结果。
DataStream result =
keyedStream.fold("start", new FoldFunction() {
@Override
public String fold(String current, Integer value) {
return current + "-" + value;
}
});
运行结果为:
假设数据源为 (1,2,3,4,5)
结果为:start-1,start-1-2......
KeyedStream →DataStream:分组数据流上的滚动聚合操作。min和minBy的区别是min返回的是一个最小值,而minBy返回的是其字段中包含最小值的元素(同样原理适用于max和maxBy),返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
注意:在2.3.10之前的算子都是可以直接作用在Stream上的,因为他们不是聚合类型的操作,但是到2.3.10后你会发现,我们虽然可以对一个无边界的流数据直接应用聚合算子,但是它会记录下每一次的聚合结果,这往往不是我们想要的,其实,reduce、fold、aggregation这些聚合算子都是和Window配合使用的,只有配合Window,才能得到想要的结果。
1、connect:
DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
DataStream someStream = //...
DataStream otherStream = //...
ConnectedStreams connectedStreams = someStream.connect(otherStream);
2、coMap、coFlatMap
ConnectedStreams → DataStream:专门用于connect之后的stream操作的map和flatmap算子。
connectedStreams.map(new CoMapFunction() {
@Override
public Boolean map1(Integer value) {
return true;
}
@Override
public Boolean map2(String value) {
return false;
}
});
connectedStreams.flatMap(new CoFlatMapFunction() {
@Override
public void flatMap1(Integer value, Collector out) {
out.collect(value.toString());
}
@Override
public void flatMap2(String value, Collector out) {
for (String word: value.split(" ")) {
out.collect(word);
}
}
});
split:
DataStream → SplitStream:将一个数据流拆分成两个或者多个数据流.并且会给每个数据流起一个别名
select:SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。
SplitStream split = someDataStream.split(new OutputSelector() {
@Override
public Iterable select(Integer value) {
List output = new ArrayList();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});
split.select("even").print();
split.select("odd").print();
DataStream → DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。注意:如果你将一个DataStream跟它自己做union操作,在新的DataStream中,你将看到每一个元素都出现两次。这和connect不一样,connect并没有合并多个stream
dataStream.union(otherStream1, otherStream2, ...);