重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
本篇内容主要讲解“Spark的基础知识点有哪些”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Spark的基础知识点有哪些”吧!
站在用户的角度思考问题,与客户深入沟通,找到焉耆网站设计与焉耆网站推广的解决方案,凭借多年的经验,让设计与互联网技术结合,创造个性化、用户体验好的作品,建站类型包括:成都网站建设、做网站、企业官网、英文网站、手机端网站、网站推广、申请域名、虚拟主机、企业邮箱。业务覆盖焉耆地区。
Spark使用简练优雅的Scala语言编写,基于Scala提供了交互式编程体验,同时提供多种方便易用的API。Spark遵循“一个软件栈满足不同应用场景”的设计理念,逐渐形成了一套完整的生态系统(包括 Spark提供内存计算框架、SQL即席查询(Spark SQL)、流式计算(Spark Streaming)、机器学习(MLlib)、图计算(Graph X)等),Spark可以部署在yarn资源管理器上,提供一站式大数据解决方案,可以同时支持批处理、流处理、交互式查询。
MapReduce计算模型延迟高,无法胜任实时、快速计算的需求,因而只适用于离线场景,Spark借鉴MapReduce计算模式,但与之相比有以下几个优势(快、易用、全面):
Spark作为计算框架只是取代了Hadoop生态系统中的MapReduce计算框架,它任需要HDFS来实现数据的分布式存储,Hadoop中的其他组件依然在企业大数据系统中发挥着重要作用。
Spark的不足:虽然Spark很快,但现在在生产环境中仍然不尽人意,无论扩展性、稳定性、管理性等方面都需要进一步增强;同时Spark在流处理领域能力有限,如果要实现亚秒级或大容量的数据获取或处理需要其他流处理产品。
Cloudera旨在让Spark流数据技术适用于80%的使用场合,就考虑到这一缺陷,在实时分析(而非简单数据过滤或分发)场景中,很多以前使用S4或Storm等流式处理引擎的实现已经逐渐被Kafka+Spark Streaming代替;
Hadoop现在分三块HDFS/MR/YARN,Spark的流行将逐渐让MapReduce、Tez走进博物馆;Spark只是作为一个计算引擎比MR的性能要好,但它的存储和调度框架还是依赖于HDFS/YARN,Spark也有自己的调度框架,但不成熟,基本不可商用。
YARN实现了一个集群多个框架”,即在一个集群上部署一个统一的资源调度管理框架,并部署其他各种计算框架,YARN为这些计算框架提供统一的资源调度管理服务,并且能够根据各种计算框架的负载需求调整各自占用的资源,实现集群资源共享和资源弹性收缩;
并且,YARN实现集群上的不同应用负载混搭,有效提高了集群的利用率;不同计算框架可以共享底层存储,避免了数据集跨集群移动 ;
这里使用Spark on Yarn 模式部署,配置on yarn模式只需要修改很少配置,也不用使用启动spark集群命令,需要提交任务时候须指定在yarn上。
Spark运行需要Scala语言,须下载Scala和Spark并解压到家目录,设置当前用户的环境变量(~/.bash_profile),增加SCALA_HOME和SPARK_HOME路径并立即生效;启动scala命令和spark-shell命令验证是否成功;Spark的配置文件修改按照官网教程不好理解,这里完成的配置参照博客及调试。
Spark的需要修改两个配置文件:spark-env.sh和spark-default.conf,前者需要指明Hadoop的hdfs和yarn的配置文件路径及Spark.master.host地址,后者需要指明jar包地址;
spark-env.sh配置文件修改如下:
export JAVA_HOME=/home/stream/jdk1.8.0_144
export SCALA_HOME=/home/stream/scala-2.11.12
export HADOOP_HOME=/home/stream/hadoop-3.0.3
export HADOOP_CONF_DIR=/home/stream/hadoop-3.0.3/etc/hadoop
export YARN_CONF_DIR=/home/stream/hadoop-3.0.3/etc/hadoop
export SPARK_MASTER_HOST=xx
export SPARK_LOCAL_IP=xxx
spark-default.conf配置修改如下:
//增加jar包地址
spark.yarn.jars=hdfs://1xxx/spark_jars/*
该设置表明将jar地址定义在hdfs上,必须将~/spark/jars路径下所有的jar包都上传到hdfs的/spark_jars/路径(hadoop hdfs –put ~/spark/jars/*),否则会报错无法找到编译jar包错误;
直接无参数启动./spark-shell ,运行的是本地模式:
启动./spark-shell –master yarn,运行的是on yarn模式,前提是yarn配置成功并可用:
在hdfs文件系统中创建文件README.md,并读入RDD中,使用RDD自带的参数转换,RDD默认每行为一个值:
使用./spark-shell --master yarn启动spark 后运行命令:val textFile=sc.textFile(“README.md”)读取hdfs上的README.md文件到RDD,并使用内置函数测试如下,说明spark on yarn配置成功.
在启动spark-shell时候,报错Yarn-site.xml中配置的最大分配内存不足,调大这个值为2048M,需重启yarn后生效。
设置的hdfs地址冲突,hdfs的配置文件中hdfs-site.xml设置没有带端口,但是spark-default.conf中的spark.yarn.jars值带有端口,修改spark-default.conf的配置地址同前者一致:
在实际应用中,大数据处理主要包括以下三个类型:复杂的批量数据处理:通常时间跨度在数十分钟到数小时之间;基于历史数据的交互式查询:通常时间跨度在数十秒到数分钟之间;基于实时数据流的数据处理:通常时间跨度在数百毫秒到数秒之间;
同时存在以上场景需要同时部署多个组件,如:MapReduce/Impala/Storm,这样做难免会带来一些问题:不同场景之间输入输出数据无法做到无缝共享,通常需要进行数据格式的转换,不同的软件需要不同的开发和维护团队,带来了较高的使用成本,比较难以对同一个集群中的各个系统进行统一的资源协调和分配;
Spark的设计遵循“一个软件栈满足不同应用场景”的理念,逐渐形成了一套完整的生态系统,其生态系统包含了Spark Core、Spark SQL、Spark Streaming( Structured Streaming)、MLLib和GraphX 等组件,既能够提供内存计算框架,也可以支持SQL即席查询、实时流式计算、机器学习和图计算等。
而且Spark可以部署在资源管理器YARN之上,提供一站式的大数据解决方案;因此,Spark所提供的生态系统足以应对上述三种场景,即批处理、交互式查询和流数据处理。
RDD:是Resilient Distributed Dataset(弹性分布式数据集)的简称,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型 ;
DAG:是Directed Acyclic Graph(有向无环图)的简称,反映RDD之间的依赖关系 ;
Executor:是运行在工作节点(WorkerNode)的一个进程,负责运行Task ;
应用(Application):用户编写的Spark应用程序;
任务( Task ):运行在Executor上的工作单元 ;
作业( Job ):一个作业包含多个RDD及作用于相应RDD上的各种操作;
阶段( Stage ):是作业的基本调度单位,一个作业会分为多组任务,每组任务被称为阶段,或者也被称为任务集合,代表了一组关联的、相互之间没有Shuffle依赖关系的任务组成的任务集;
Spark运行架构包括集群资源管理器(Cluster Manager)、运行作业任务的工作节点(Worker Node)、每个应用的任务控制节点(Driver)和每个工作节点上负责具体任务的执行进程(Executor),资源管理器可以自带或使用Mesos/YARN;
一个应用由一个Driver和若干个作业构成,一个作业由多个阶段构成,一个阶段由多个没有Shuffle关系的任务组成;
当执行一个应用时,Driver会向集群管理器申请资源,启动Executor,并向Executor发送应用程序代码和文件,然后在Executor上执行任务,运行结束后,执行结果会返回给Driver,或者写到HDFS或者其他数据库中。
SparkContext对象代表了和一个集群的连接:
(1)首先为应用构建起基本的运行环境,即由Driver创建一个SparkContext,进行资源的申请、任务的分配和监控;
(2)资源管理器为Executor分配资源,并启动Executor进程;
(3)SparkContext根据RDD的依赖关系构建DAG图,DAG图提交给DAGScheduler解析成Stage,然后把一个个TaskSet提交给底层调度器TaskScheduler处理;Executor向SparkContext申请Task,Task Scheduler将Task发放给Executor运行,并提供应用程序代码;
(4)Task在Executor上运行,把执行结果反馈给TaskScheduler,然后反馈给DAGScheduler,运行完毕后写入数据并释放所有资源;
许多迭代式算法(比如机器学习、图算法等)和交互式数据挖掘工具,共同之处是不同计算阶段之间会重用中间结果, MapReduce框架把中间结果写入到稳定存储(如磁盘)中,带来大量的数据复制、磁盘IO和序列化开销。
RDD就是为了满足这种需求而出现的,它提供了一个抽象的数据架构,开发者不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换处理,不同RDD之间的转换操作形成依赖关系,可以实现管道化,避免中间数据存储。一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合,每个RDD可分成多个分区,每个分区就是一个数据集片段,并且一个RDD的不同分区可以被保存到集群中不同的节点上,从而可以在集群中的不同节点上进行并行计算。
RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,不能直接修改,只能基于稳定的物理存储中的数据集创建RDD,或者通过在其他RDD上执行确定的转换操作(如map、join和group by)而创建得到新的RDD。
RDD提供了丰富的操作以支持常见数据运算,分“转换”(Transformation)和“动作”(Action)两种类型;RDD提供的转换接口都非常简单,都是类似map、filter、groupBy、join等粗粒度的数据转换操作,而不是针对某个数据项的细粒度修改(不适合网页爬虫),表面上RDD的功能很受限、不够强大,实际上RDD已经被实践证明可以高效地表达许多框架的编程模型(比如MapReduce、SQL、Pregel);Spark用Scala语言实现了RDD的API,程序员可以通过调用API实现对RDD的各种操作
RDD典型的执行过程如下,这一系列处理称为一个Lineage(血缘关系),即DAG拓扑排序的结果:
Spark采用RDD以后能够实现高效计算的原因主要在于:
(1)高容错性:血缘关系、重新计算丢失分区、无需回滚系统、重算过程在不同节点之间并行、只记录粗粒度的操作;
(2)中间结果持久化到内存:数据在内存中的多个RDD操作之间进行传递,避免了不必要的读写磁盘开销;
(3)存放的数据是Java对象:避免了不必要的对象序列化和反序列化;
Spark通过分析各个RDD的依赖关系生成了DAG,并根据RDD 依赖关系把一个作业分成多个阶段,阶段划分的依据是窄依赖和宽依赖,窄依赖可以实现流水线优化,宽依赖包含Shuffle过程,无法实现流水线方式处理。
窄依赖表现为一个父RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区;宽依赖则表现为存在一个父RDD的一个分区对应一个子RDD的多个分区。
逻辑上每个RDD 操作都是一个fork/join(一种用于并行执行任务的框架),把计算fork 到每个RDD 分区,完成计算后对各个分区得到的结果进行join 操作,然后fork/join下一个RDD 操作;
RDD Stage划分:Spark通过分析各个RDD的依赖关系生成了DAG,再通过分析各个RDD中的分区之间的依赖关系来决定如何划分Stage,具体方法:
通过上述对RDD概念、依赖关系和Stage划分的介绍,结合之前介绍的Spark运行基本流程,总结一下RDD在Spark架构中的运行过程:
(1)创建RDD对象;
(2)SparkContext负责计算RDD之间的依赖关系,构建DAG;
(3)DAG Scheduler负责把DAG图分解成多个Stage,每个Stage中包含了多个Task,每个Task会被TaskScheduler分发给各个WorkerNode上的Executor去执行;
RDD的创建可以从从文件系统中加载数据创建得到,或者通过并行集合(数组)创建RDD。Spark采用textFile()方法来从文件系统中加载数据创建RDD,该方法把文件的URI作为参数,这个URI可以是本地文件系统的地址,或者是分布式文件系统HDFS的地址;
从文件系统中加载数据:
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
从HDFS中加载数据:
scala> val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
可以调用SparkContext的parallelize方法,在Driver中一个已经存在的集合(数组)上创建。
scala>val array = Array(1,2,3,4,5)
scala>val rdd = sc.parallelize(array)
或者从列表中创建:
scala>val list = List(1,2,3,4,5)
scala>val rdd = sc.parallelize(list)
对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用,转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作;
常用的RDD转换操作,总结如下:
filter(func)操作:筛选出满足函数func的元素,并返回一个新的数据集
scala> val lines =sc.textFile(file:///usr/local/spark/mycode/rdd/word.txt)
scala> val linesWithSpark=lines.filter(line => line.contains("Spark"))
map(func)操作:map(func)操作将每个元素传递到函数func中,并将结果返回为一个新的数据集
scala> data=Array(1,2,3,4,5)
scala> val rdd1= sc.parallelize(data)
scala> val rdd2=rdd1.map(x=>x+10)
另一个实例:
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
scala> val words=lines.map(line => line.split(" "))
flatMap(func)操作:拍扁操作
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
scala> val words=lines.flatMap(line => line.split(" "))
groupByKey()操作:应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集;
reduceByKey(func)操作:应用于(K,V)键值对的数据集返回新(K, V)形式数据集,其中每个值是将每个key传递到函数func中进行聚合后得到的结果:
行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,这就是惰性机制,“惰性机制”是指,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会触发“从头到尾”的真正的计算,常用的行动操作:
Spark RDD采用惰性求值的机制,但是每次遇到行动操作都会从头开始执行计算,每次调用行动操作都会触发一次从头开始的计算,这对于迭代计算而言代价是很大的,迭代计算经常需要多次重复使用同一组数据:
scala> val list = List("Hadoop","Spark","Hive")
scala> val rdd = sc.parallelize(list)
scala> println(rdd.count()) //行动操作,触发一次真正从头到尾的计算
scala> println(rdd.collect().mkString(",")) //行动操作,触发一次真正从头到尾的计算
可以通过持久化(缓存)机制避免这种重复计算的开销,可以使用persist()方法对一个RDD标记为持久化,之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化,持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复使用;
persist()的圆括号中包含的是持久化级别参数,persist(MEMORY_ONLY)表示将RDD作为反序列化的对象存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容;persist(MEMORY_AND_DISK)表示将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被存放在硬盘上;一般而言,使用cache()方法时,会调用persist(MEMORY_ONLY),同时可以使用unpersist()方法手动地把持久化的RDD从缓存中移除。
针对上面的实例,增加持久化语句以后的执行过程如下:
scala> val list = List("Hadoop","Spark","Hive")
scala> val rdd = sc.parallelize(list)
scala> rdd.cache() //会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,因为这时rdd还没有被计算生成
scala> println(rdd.count()) //第一次行动操作,触发一次真正从头到尾的计算,这时上面的rdd.cache()才会被执行,把这个rdd放到缓存中
scala> println(rdd.collect().mkString(",")) //第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区分别保存在不同的节点上,分区的作用:(1)增加并行度(2)减少通信开销。RDD分区原则是使得分区的个数尽量等于集群中的CPU核心(core)数目,对于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目,一般而言:
本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N;
Standalone或YARN:在“集群中所有CPU核心数目总和”和2二者中取较大值作为默认值;
设置分区的个数有两种方法:创建RDD时手动指定分区个数,使用reparititon方法重新设置分区个数;
创建RDD时手动指定分区个数:在调用textFile()和parallelize()方法的时候手动指定分区个数即可,语法格式如 sc.textFile(path, partitionNum),其中path参数用于指定要加载的文件的地址,partitionNum参数用于指定分区个数。
scala> val array = Array(1,2,3,4,5)
scala> val rdd = sc.parallelize(array,2) //设置两个分区
reparititon方法重新设置分区个数:通过转换操作得到新 RDD 时,直接调用 repartition 方法即可,如:
scala> val data = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt",2)
scala> data.partitions.size //显示data这个RDD的分区数量
scala> val rdd = data.repartition(1) //对data这个RDD进行重新分区
scala> rdd.partitions.size
res4: Int = 1
完成Spark部署后,使用spark-shell指令进入Scala交互编程界面,spark-shell默认创建一个sparkContext(sc),在spark-shell启动时候可以查看运行模式是on yarn还是local模式,使用交互式界面可以直接引用sc变量使用;
使用Spark-shell处理数据实例:读取HDFS文件系统中文件实现WordCount 单词计数:
sc.textFile("hdfs://172.22.241.183:8020/user/spark/yzg_test.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect()
其中,map((_,1)) 等同于map(x => (x, 1))
使用saveAsText File()函数可以将结果保存到文件系统中。
Spark采用Scala语言编写,在开发中需要熟悉函数式编程思想,并熟练使用Scala语言,使用Scala进行Spark开发的代码量大大少于Java开发的代码量;
函数式编程属于声明式编程的一种,将计算描述为数学函数的求值,但函数式编程没有准确的定义,只是一系列理念,并不需要严格准守,可以理解为函数式编程把程序看做是数学函数,输入的是自变量,输出因变量,通过表达式完成计算,当前越来越多的命令式语言支持部分的函数式编程特性。
在函数式编程中,函数作为一等公民,就是说函数的行为和普通变量没有区别,可以作为函参进行传递,也可以在函数内部声明一个函数,那么外层的函数就被称作高阶函数。
函数式编程的curry化:把接受多个参数的函数变换成接受一个单一参数的函数,返回接受余下的参数并且返回结果的新函数。
函数式编程要求所有的变量都是常量(这里所用的变量这个词并不准确,只是为了便于理解),erlang是其中的典型语言,虽然许多语言支持部分函数式编程的特性,但是并不要求变量必须是常量。这样的特性提高了编程的复杂度,但是使代码没有副作用,并且带来了很大的一个好处,那就是大大简化了并发编程。
Java中最常用的并发模式是共享内存模型,依赖于线程与锁,若代码编写不当,会发生死锁和竞争条件,并且随着线程数的增加,会占用大量的系统资源。在函数式编程中,因为都是常量,所以根本就不用考虑死锁等情况。为什么说一次赋值提高了编程的复杂度,既然所有变量都是常量,那么我们没办法更改一个变量的值,循环的意义也就不大,所以haskell与erlang中使用递归代替了循环。
Scala即可伸缩的语言(Scalable Language),是一种多范式的编程语言,类似于java的编程,设计初衷是要集成面向对象编程和函数式编程的各种特性。
在Scala中函数是一等公民,像变量一样既可以作为函参使用,也可以将函数赋值给一个变量;而且函数的创建不用依赖于类、或对象,在Java中函数的创建则要依赖于类、抽象类或者接口。Scala函数有两种定义:
Scala的函数定义规范化写法,最后一行代码是它的返回值:
精简后函数定义可以只有一行:
也可以直接使用val将函数定义成变量,表示定义函数addInt,输入参数有两个,分别为x,y,均为Int类型,返回值为两者的和,类型Int:
Scala中的匿名函数也叫做函数字面量,既可以作为函数的参数使用,也可以将其赋值给一个变量,在匿名函数的定义中“=>”可理解为一个转换器,它使用右侧的算法,将左侧的输入数据转换为新的输出数据,使用匿名函数后,我们的代码变得更简洁了。
val test = (x:Int) => x + 1
Scala使用术语“高阶函数”来表示那些把函数作为参数或函数作为返回结果的方法和函数。比如常见的有map,filter,reduce等函数,它们可以接受一个函数作为参数。
Scala中的闭包指的是当函数的变量超出它的有效作用域的时候,还能对函数内部的变量进行访问;Scala中的闭包捕获到的是变量的本身而不仅仅是变量的数值,当自由变量发生变化时,Scala中的闭包能够捕获到这个变化;如果自由变量在闭包内部发生变化,也会反映到函数外面定义的自由变量的数值。
部分应用函数只是在“已有函数”的基础上,提供部分默认参数,未提供默认参数的地方使用下划线替代,从而创建出一个“函数值”,在使用这个函数值(部分应用函数)的时候,只需提供下划线部分对应的参数即可;部分应用函数本质上是一种值类型的表达式,在使用的时候不需要提供所有的参数,只需要提供部分参数。
scala中的柯里化指的是将原来接受两个参数的函数变成新的接受一个参数的函数的过程,新的函数返回一个以原有第二个参数作为参数的函数;
def someAction(f:(Double)=>Double) = f(10)
只要满足:函数参数是一个double、返回值也是一个double,这个函数就可以作为f值;
Shark的出现,使得SQL-on-Hadoop的性能比Hive有了10-100倍的提高,但Shark的设计导致了两个问题:
Spark SQL在Hive兼容层面仅依赖HiveQL解析、Hive元数据,也就是说,从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了。Spark SQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责 ;
Spark SQL增加了DataFrame(即带有Schema信息的RDD),使用户可以在Spark SQL中执行SQL语句,数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据,Spark SQL目前支持Scala、Java、Python三种语言,支持SQL-92规范 ;
RDD就像一个屋子,找东西要把这个屋子翻遍才能找到;DataFrame相当于在你的屋子里面打上了货架,只要告诉他你是在第几个货架的第几个位置, DataFrame就是在RDD基础上加入了列,处理数据就像处理二维表一样。
DataFrame与RDD的主要区别在于,前者带schema元信息,即DataFrame表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。
Spark2.0版本开始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现其对数据加载、转换、处理等功能。SparkSession实现了SQLContext及HiveContext所有功能;
SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。SparkSession亦提供了HiveQL以及其他依赖于Hive的功能的支持;可以通过如下语句创建一个SparkSession对象:
scala> import org.apache.spark.sql.SparkSession
scala> val spark=SparkSession.builder().getOrCreate()
在创建DataFrame前,为支持RDD转换为DataFrame及后续的SQL操作,需通过import语句(即import spark.implicits._)导入相应包,启用隐式转换。
在创建DataFrame时,可使用spark.read操作从不同类型的文件中加载数据创建DataFrame,如:spark.read.json("people.json"):读取people.json文件创建DataFrame;在读取本地文件或HDFS文件时,要注意给出正确的文件路径;spark.read.csv("people.csv"):读取people.csv文件创建DataFrame;
读取hdfs上的json文件,并打印,json文件为:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
读取代码:
import org.apache.spark.sql.SparkSession
val spark=SparkSession.builder().getOrCreate()
import spark.implicits._
val df =spark.read.json("hdfs://172.22.241.183:8020/user/spark/json_sparksql.json")
df.show()
Spark官网提供了两种方法来实现从RDD转换得到DataFrame:① 利用反射来推断包含特定类型对象的RDD的schema,适用对已知数据结构的RDD转换;② 使用编程接口,构造一个schema并将其应用在已知的RDD上;
SparkSQL 的元数据的状态有两种:① in_memory,用完了元数据也就丢了;② 通过hive保存,hive的元数据存在哪儿,它的元数据也就存在哪,SparkSQL数据仓库建立在Hive之上实现的,使用SparkSQL去构建数据仓库的时候,必须依赖于Hive。
Spark-sql命令行提供了即席查询能力,可以使用类sql方式操作数据源,效率高于hive。
Spark Streaming是Spark Core扩展而来的一个高吞吐、高容错的实时处理引擎,同Storm的最大区别在于无法实现毫秒级计算,而Storm可以实现毫秒级响应,Spark Streaming 实现方式是批量计算,按照时间片对stream切割形成静态数据,并且基于RDD数据集更容易做高效的容错处理。Spark Streaming的输入和输出数据源可以是多种。Spark Streaming 实时读取数据并将数据分为小批量的batch,然后在spark引擎中处理生成批量的结果集。Spark Streaming提供了称为离散流或DStream的高级抽象概念,它表示连续的数据流。DStreams既可以从Kafka、Flume等源的输入数据流创建,也可以通过在其他DStreams上应用高级操作创建。在内部DStream表示为RDD序列。
在这里从一个例子开始介绍,StreamingContext是所有的流式计算的主要实体,创建含有两个执行线程的本地StreamingContext和1秒钟的batch,然后创建一个Dstream(lines)用于监听TCP端口,lines中的每一行就是一个RDD,flatMap函数将一个RDD分解成多个记录,是一对多的Dstream操作,这里使用空格将lines分解成单词,words被映射成(word, 1)对,随后进行词频统计,例子的代码如下:
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
Spark Streaming提供了称为离散流或DStream的高级抽象,它表示连续的数据流,在内部DStream表示为RDD序列,每个RDD包含一定间隔的数据,如下图所示:
所有对于DStream的操作都会相应地转换成对RDDs的操作,在上面的例子中,flatMap操作被应用到lines 中的每个RDD中生成了一组RDD(即words)
总结编写Spark Streaming程序的基本步骤是:
1.通过创建输入DStream来定义输入源
2.通过对DStream应用转换操作和输出操作来定义流计算
3.用streamingContext.start()来开始接收数据和处理流程
4.通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束)
5.可以通过streamingContext.stop()来手动结束流计算进程
有两种创建StreamingContext的方式:通过SparkContext创建和通过SparkConf创建;
Spark conf创建:
val conf = new SparkConf().setAppName(appName).setMaster(master);
val ssc = new StreamingContext(conf, Seconds(1));
appName是用来在Spark UI上显示的应用名称。master是Spark、Mesos或Yarn集群的URL,或者是local[*]。batch interval可以根据你的应用程序的延迟要求以及可用的集群资源情况来设置。
SparkContext创建:
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))
在前面的例子中lines就是从源得到的输入DStream,输入DStream对应一个接收器对象,可以从源接收消息并存储到Spark内存中进行处理。Spark Streaming提供两种streaming源:
在本地运行Spark Streaming程序时,不要使用“local”或“local[1]”作为主URL。这两者中的任何一个都意味着在本地运行任务只使用一个线程。如果使用基于receiver的输入DStream(如Kafka、Flume等),这表明将使用单个线程运行receiver,而不留下用于处理所接收数据的线程。因此在本地运行时,始终使用“local[n]”作为主URL,其中n必须大于运行的receiver数量,否则系统将接收数据,但不能处理它。
Kafka和Flume这类源需要外部依赖包,其中一些库具有复杂的依赖关系,Spark shell中没有这些高级源代码,因此无法在spark-shell中测试基于这些高级源代码的应用程序,但可以手动将包引入;
基于可靠性的考虑,可以将数据源分为两类:可靠的接收器的数据被Receiver 接收后发送确认到源头(如Kafka ,Flume)并将数据存储在spark,不可靠的接收器不会向源发送确认。
与RDD类似,转换操作允许修改来自输入DStream的数据,转换操作包括无状态转换操作和有状态转换操作。
无状态转换操作实例:下节spark-shell中“套接字流”词频统计采用无状态转换,每次统计都只统计当前批次到达的单词的词频,和之前批次无关,不会进行累计。
有状态转换操作实例:滑动窗口转换操作和updateStateByKey操作;
一些常见的转换如下:
每次窗口在源DStream上滑动,窗口内的源RDD被组合/操作生成了窗口RDD,在图例中,过去3个时间单位的数据将被操作,并按2个时间单位滑动。
任何窗口操作都需要指定两个参数:窗口长度:窗口的持续时间(图中值是3);滑动间隔:执行窗口操作的间隔(图中值是2)。这两个参数必须是源DStream的批处理间隔的倍数(图中值是1)
举例说明窗口操作:希望通过每隔10秒在最近30秒数据中生成字数来扩展前面的示例。为此,我们必须在最近的30秒数据上对(word,1)的DStream键值对应用reduceByKey操作。这是使用reduceByKeyAndWindow操作完成的。
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
所有的滑动窗口操作都需要使用参数:windowLength(窗口长度)和slideInterval(滑动间隔),常见窗口操作总结如下,对应的含义可参照RDD的转换操作:
Window:基于源DStream产生的窗口化的批数据计算得到新的Dstream;
countByWindow: 返回DStream中元素的滑动窗口计数;
reduceByWindow:返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数func必须满足结合律从而支持并行计算;
reduceByKeyAndWindow(三参数):应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数;
reduceByKeyAndWindow(四参数):比上述reduceByKeyAndWindow(三参数)更高效的reduceByKeyAndWindow,每个窗口的reduce值是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce”操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入)。
countByValueAndWindow:当应用到一个(K,V)键值对组成的DStream上,返回一个由(K,V)键值对组成的新的DStream。每个key的值都是它们在滑动窗口中出现的频率。
updateStateByKey:需要在跨批次之间维护状态时,必须使用updateStateByKey操作;
窗口计算上join操作非常有用,在Spark Streaming中可以轻松实现不同类型的join,包括leftouterjoin、rightouterjoin和fulloterjoin。每个批处理间隔中stream1生成的RDD与stream2生成的RDD关联如下:
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
输出操作允许将DStream的数据推送到外部系统,如数据库或files,由于输出操作触发所有DStream转换的实际执行(类似于RDD的操作),并允许外部系统使用转换后的数据,输出操作有以下几种:
在输出DStream中,Dstream.foreachRDD是一个功能强大的原语.
可以轻松地对流数据使用DataFrames和SQL操作,但必须使用StreamingContext正在用的SparkContext创建SparkSession。下面例子使用DataFrames和SQL生成单词计数。每个RDD都转换为DataFrame,注册为临时表后使用SQL进行查询:
val words: DStream[String] =
words.foreachRDD { rdd =>
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val wordsDataFrame = rdd.toDF("word")
wordsDataFrame.createOrReplaceTempView("words")
val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
}
进入spark-shell后就默认获得了的SparkConext,即sc,从SparkConf对象创建StreamingContext对象,spark-shell中创建StreamingContext对象如下:
scala> import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc, Seconds(1))
如果是编写一个独立的Spark Streaming程序,而不是在spark-shell中运行,则需要通过如下方式创建StreamingContext对象:
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName("TestDStream").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(1))
文件流可以读取本机文件,也可以读取读取HDFS上文件,如果部署的on yarn模式的Spark,则启动spark-shell默认读取HDFS上对应的: hdfs:xxxx/user/xx/ 下的文件;
scala> import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc, Seconds(5))
scala> val lines = ssc.textFileStream("hdfs://xxx/yzg_test.txt")
scala> val Counts = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
scala> Counts.saveAsTextFiles("hdfs://xxx/bendi"))
scala> ssc.start()
scala> ssc.awaitTermination()
scala> ssc.stop()
以上代码在spark-shell中运行后,每隔5秒读取hdfs上的文件并进行词频统计后写入到hdfs中的“bendi-时间戳”文件夹下,直到ssc.stop();Counts.saveAsTextFiles("file://xxx/bendi"))和Counts.print分别写本地和std输出;
Spark Streaming可以通过Socket端口实时监听并接收数据计算,步骤如下:
driver端创建StreamingContext对象,启动上下文时依次创建JobScheduler和ReceiverTracker,并调用他们的start方法。ReceiverTracker在start方法中发送启动接收器消息给远程Executor,消息内部含有ServerSocket的地址信息。在executor一侧,由Receiver TrackerEndpoint终端接受消息,抽取消息内容,利用sparkContext结合消息内容创建ReceiverRDD对象,最后提交rdd给spark集群。在代码实现上,使用nc –lk 9999 开启 地址172.22.241.184主机的9999监听端口,并持续往里面写数据;使用spark-shell实现监听端口代码如下,输入源为socket源,进行简单的词频统计后,统计结果输出到HDFS文件系统;
scala> import org.apache.spark._
scala> import org.apache.spark.streaming._
scala> import org.apache.spark.storage.StorageLevel
scala> val ssc = new StreamingContext(sc, Seconds(5))
scala> val lines = ssc.socketTextStream("172.22.241.184", 9999, StorageLevel.MEMORY_AND_DISK_SER)
scala> val wordCounts = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
scala> wordCounts.saveAsTextFiles("hdfs://xxx/bendi-socket"))
scala> ssc.start()
scala> ssc.awaitTermination()
scala> ssc.stop()
Kafka和Flume等高级输入源需要依赖独立的库(jar文件),如果使用spark-shell读取kafka等高级输入源,需要将对应的依赖jar包放在spark的依赖文件夹lib下。
根据当前使用的kafka版本,适配所需要的spark-streaming-kafka依赖的版本,在maven仓库下载,地址如下:https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10/1.2.1
将对应的依赖jar包放在CDH的spark的依赖文件夹lib下,通过引入包内依赖验证是否成功:
scala> import org.apache.spark._
scala> import org.apache.spark.streaming._
scala> import org.apache.spark.streaming.kafka._
scala> val ssc = new StreamingContext(sc, Seconds(5))
scala> ssc.checkpoint("hdfs://usr/spark/kafka/checkpoint")
scala> val zkQuorum = "172.22.241.186:2181"
scala> val group = "test-consumer-group"
scala> val topics = "yzg_spark"
scala> val numThreads = 1
scala> val topicMap =topics.split(",").map((_,numThreads.toInt)).toMap
scala> val lineMap = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
scala> val pair = lineMap.map(_._2).flatMap(_.split(" ")).map((_,1))
scala> val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ -_,Minutes(2),Seconds(10),2)
scala> wordCounts.print
scala> ssc.start
当Spark Streaming需要跨批次间维护状态时,就必须使用updateStateByKey操作。以词频统计为例,对于有状态转换操作而言,当前批次的词频统计是在之前批次的词频统计结果的基础上进行不断累加,所以最终统计得到的词频是所有批次的单词总的词频统计结果。
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
实现:
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint("hdfs:172.22.241.184:8020//usr/spark/checkpoint")
val lines = ssc.socketTextStream("172.22.241.184", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val wordCounts = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey[Int](updateFunc)
wordCounts.saveAsTextFiles("hdfs:172.22.241.184:8020//user/spark/bendi-socket")
ssc.start()
ssc.awaitTermination()
ssc.stop()
关于SparkStreaming实时计算框架实时地读取kafka中的数据然后进行计算,在spark1.3版本后kafkaUtils提供两种Dstream创建方法,一种为KafkaUtils.createDstream,另一种为KafkaUtils.createDirectStream。
KafkaUtils.createDstream方式
其构造函数为KafkaUtils.createDstream(ssc,[zk], [consumer group id], [per-topic,partitions] ),使用receivers来接收数据,利用的是Kafka高层次的消费者api,对于所有的receivers接收到的数据将会保存在Spark executors中,然后通过Spark Streaming启动job来处理这些数据,默认会丢失,可启用WAL日志,它同步将接受到数据保存到分布式文件系统上比如HDFS。所以数据在出错的情况下可以恢复出来。
A、创建一个receiver来对kafka进行定时拉取数据,ssc的RDD分区和Kafka的topic分区不是一个概念,故如果增加特定主消费的线程数仅仅是增加一个receiver中消费topic的线程数,并不增加spark的并行处理数据数量。
B、对于不同的group和topic可以使用多个receivers创建不同的DStream
C、如果启用了WAL(spark.streaming.receiver.writeAheadLog.enable=true)
同时需要设置存储级别(默认StorageLevel.MEMORY_AND_DISK_SER_2),即KafkaUtils.createStream(….,StorageLevel.MEMORY_AND_DISK_SER)
KafkaUtils.createDirectStream方式
在spark1.3之后,引入了Direct方式,不同于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,之后根据设定的maxRatePerPartition来处理每个batch。
这种方法相较于Receiver方式的优势在于:
简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。
高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。
精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。
此方法缺点是它不会更新Zookeeper中的偏移量,因此基于Zookeeper的Kafka监视工具将不会显示进度。但是您可以在每个批处理中访问此方法处理的偏移量,并自行更新Zookeeper。
参照官方的API文档地址:http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html,位置策略是用来控制特定的主题分区在哪个执行器上消费的,在executor针对主题分区如何对消费者进行调度,并且位置的选择是相对的,位置策略有三种方案:
1、PreferBrokers:首选kafka服务器,只有在kafka服务器和executor位于同一主机可以使用该策略。
2、PreferConsistent:首选一致性,多数时候采用该方式,在所有可用的执行器上均匀分配kakfa的主题的所有分区,能够综合利用集群的计算资源。
3、PreferFixed:首选固定模式,如果负载不均衡可以使用该策略放置在特定节点使用指定的主题分区;该配置是手动控制方案,若没有显式指定的分区仍然采用(2)方案。
消费者策略是控制如何创建和配制消费者对象或者如何对Kafka上的消息进行消费界定,比如t1主题的分区0和1,或者消费特定分区上的特定消息段。该类可扩展,自行实现。
1、ConsumerStrategies.Assign:指定固定的分区集合;
def Assign[K, V](
topicPartitions: Iterable[TopicPartition],
kafkaParams: collection.Map[String, Object],
offsets: collection.Map[TopicPartition, Long])
2、ConsumerStrategies.Subscribe:允许消费订阅固定的主题集合;
3、ConsumerStrategies.SubscribePattern:使用正则表达式指定感兴趣的主题集合。
IDEA作为常用的开发工具使用maven进行依赖包的统一管理,配置Scala的开发环境,进行Spark Streaming的API开发;
下载并破解IDEA,并加入汉化的包到lib,重启生效;
在IDEA中导入离线的Scala插件:需要确保当前win主机上已经下载安装Scala并设置环境变量,首先下载IDEA的Scala插件,无须解压,然后将其添加到IDEA中,具体为new---setting--plugins--"输入scala"--install plugin from disk;
shift键多次------查找类和插件;
shift+ctrl+enter-------结束当前行,自动补全分号;
shift+alter+s-----------setting设置
alter+enter-----------补全抛出的异常
alter+insert---------自动生成get、set、构造函数等;
Ctrl+X --------------删除当前行
ctrl+r----------------替换
ctrl+/----------------多行代码分行注释,每行一个注释符号
ctrl+shift+/---------多行代码注释在一个块里,只在开头和结尾有注释符号
新建maven工程:file--new--project--maven(选择quickstart框架模型新建),groupId和ArtifactID用来区分该java工程;
maven自动生成pom.xml配置文件,用于各种包的依赖和引入,如果使用maven打包,需要引入maven的打包插件:使用maven-compiler-plugin、maven-jar-plugin插件,并在prom.xml中增加指定程序入口的
将mainClass设置为HelloWorld(主类),点击右边窗口maven -> package,生成jar包,打包完成后使用spark-submit指令提交jar包并执行。
spark-submit --class "JSONRead" /usr/local/spark/mycode/json/target/scala-2.11/json-project_2.11-1.0.jar
若有cannot find main class错误,需要删除-class xx.jar选项;若出现“Invalid signature file digest for Manifest main”错误,则运行zip -d xxx.jar 'META-INF/.SF' 'META-INF/.RSA' 'META-INF/*SF' 指令,删除所属jar包中.SF/.RSA/相关文件。任务yarn管理器查看任务运行情况;
在Spark2.x中,spark新开放了一个基于DataFrame的无下限的流式处理组件Structured Streaming,在过去使用streaming时一次处理是当前batch的所有数据,针对这波数据进行各种处理,如果要做一些类似pv,uv的统计,需要借助有状态的state的DStream,或者借助一些分布式缓存系统,如redis,做一些类似Group by的操作Streaming是非常不便的,在面对复杂的流式处理场景时捉襟见肘,且无法支持基于event_time的时间窗口做聚合逻辑。
在Structured Streaming中,把源源不断到来的数据通过固定的模式“追加”或者“更新”到无下限的DataFrame中。剩余的工作跟普通的DataFrame一样,可以去map、filter,也可以去groupby().count(),甚至还可以把流处理的dataframe跟其他的“静态”DataFrame进行join。另外,还提供了基于window时间的流式处理。总之,Structured Streaming提供了快速、可扩展、高可用、高可靠的流式处理。
Structured Streaming构建于sparksql引擎之上,可以用处理静态数据的方式去处理你的流计算,随着流数据的不断流入,Sparksql引擎会增量的连续不断的处理并且更新结果。可以使用DataSet/DataFrame的API进行 streaming aggregations, event-time windows, stream-to-batch joins等,计算的执行也是基于优化后的sparksql引擎。通过checkpointing and Write Ahead Logs该系统可以保证点对点,一次处理,容错担保。
到此,相信大家对“Spark的基础知识点有哪些”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!