重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

Spark-sql如何创建外部分区表

这篇文章主要为大家展示了“Spark-sql如何创建外部分区表”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“Spark-sql如何创建外部分区表”这篇文章吧。

创新互联是专业的绵竹网站建设公司,绵竹接单;提供网站建设、网站制作,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行绵竹网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!

一、Spark-sql创建外部分区表

1.使用spark-sql

spark-sql --queue spark --master yarn --deploy-mode client --num-executors 10  --executor-cores 2 --executor-memory 3G

2.spark-sql中创建parquet分区表

create external table pgls.convert_parq(
bill_num string,
logis_id string,
store_id string,
store_code string,
creater_id string,
order_status INT,
pay_status INT,
order_require_varieties INT,
order_require_amount decimal(19,4),
order_rec_amount decimal(19,4),
order_rec_gpf decimal(19,4),
deli_fee FLOAT,
order_type INT,
last_modify_time timestamp,
order_submit_time timestamp
) 
partitioned by(order_submit_date date)
row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
stored as parquetfile
location '/test/spark/convert/parquet/bill_parq/';

二、CSV转Parquet

代码:org.apache.spark.ConvertToParquet.scala

package org.apache.spark

import com.ecfront.fs.operation.HDFSOperation
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._

/**
* CSV 转换为 parquet
* 参数:输入路径, 输出路径, 分区数
*/
object ConvertToParquet{
def main(args: Array[String]) {
if(args.length != 3){
println("jar args: inputFiles outPath numpartitions")
System.exit(0)
}
val inputPath = args(0)
val outPath = args(1)
val numPartitions = args(2).toInt

println("==========================================")
println("=========input: "+ inputPath )
println("=========output: "+ outPath )
println("==numPartitions: "+ numPartitions )
println("==========================================")

//判断输出目录是否存在,存在则删除
val fo = HDFSOperation(new Configuration())
val existDir = fo.existDir(outPath)
if(existDir) {
println("HDFS exists outpath: " + outPath)
println("start to delete ...")
val isDelete = fo.deleteDir(outPath)
if(isDelete){
println(outPath +" delete done. ")
}
}

val conf = new SparkConf()
val sc = new SparkContext(conf) //参数SparkConf创建SparkContext,
val sqlContext = new SQLContext(sc) //参数SparkContext创建SQLContext

val schema = StructType(Array(
StructField("bill_num",DataTypes.StringType,false),
StructField("logis_id",DataTypes.StringType,false),
StructField("store_id",DataTypes.StringType,false),
StructField("store_code",DataTypes.StringType,false),
StructField("creater_id",DataTypes.StringType,false),
StructField("order_status",DataTypes.IntegerType,false),
StructField("pay_status",DataTypes.IntegerType,false),
StructField("order_require_varieties",DataTypes.IntegerType,false),
StructField("order_require_amount",DataTypes.createDecimalType(19,4),false),
StructField("order_rec_amount",DataTypes.createDecimalType(19,4),false),
StructField("order_rec_gpf",DataTypes.createDecimalType(19,4),false),
StructField("deli_fee",DataTypes.FloatType,false),
StructField("order_type",DataTypes.IntegerType,false),
StructField("last_modify_time",DataTypes.TimestampType,false),
StructField("order_submit_time",DataTypes.TimestampType,false),
StructField("order_submit_date",DataTypes.DateType,false)))

convert(sqlContext, inputPath, schema, outPath, numPartitions)
}

//CSV转换为parquet
def convert(sqlContext: SQLContext, inputpath: String, schema: StructType, outpath: String, numPartitions: Int) {
// 将text导入到DataFrame
val df = sqlContext.read.format("com.databricks.spark.csv").
schema(schema).option("delimiter", ",").load(inputpath)
// 转换为parquet
// df.write.parquet(outpath) // 转换时以block数为分区数
df.coalesce(numPartitions).write.parquet(outpath) //自定义分区数
}

}
打包后jar上传至本地目录:
/soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar

事先在HDFS上生成CSV文件,HDFS目录:
/test/spark/convert/data/order/2016-05-01/

执行命令:
spark-submit --queue spark --master yarn --num-executors 10  --executor-cores 2 --executor-memory 3G --class org.apache.spark.ConvertToParquet --name ConvertToParquet file:/soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar  /test/spark/convert/data/order/2016-05-01/ /test/spark/convert/parquet/bill_parq/order_submit_date=2016-05-01

pom.xml相关内容:

1.依赖包:




        com.ecfront
        ez-fs
        0.9
    



        org.apache.spark
        spark-core_2.10
        1.6.1
    

    
        org.apache.spark
        spark-sql_2.10
        1.6.1
    



        com.databricks
        spark-csv_2.11
        1.4.0
    



        org.apache.hadoop
        hadoop-client
        2.6.0
    

    2.plugins(含打入依赖包)


    
        
            
                net.alchim31.maven
                scala-maven-plugin
                3.2.1
            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                2.0.2
            
            
                org.apache.maven.plugins
                maven-shade-plugin
                1.4
                
                    
                        
                            *:*
                            
                                META-INF/*.SF
                                META-INF/*.DSA
                                META-INF/*.RSA
                            
                        
                    
                
            
        
    
    
        
            net.alchim31.maven
            scala-maven-plugin
            
                
                    scala-compile-first
                    process-resources
                    
                        add-source
                        compile
                    
                
                
                    scala-test-compile
                    process-test-resources
                    
                        testCompile
                    
                
            
        
        
            org.apache.maven.plugins
            maven-compiler-plugin
            
                
                    compile
                    
                        compile
                    
                
            
        
        
            org.apache.maven.plugins
            maven-shade-plugin
            1.4
            
                true
            
            
                
                    package
                    
                        shade
                    
                    
                        
                            
                            
                            org.apache.spark.ConvertToParquet
                            
                        
                    
                
            
        
    

三、表添加分区

spark-sql下执行

alter table pgls.convert_parq add partition(order_submit_date='2016-05-01');

可通过sql查询到相应数据:

select * from pgls.convert_parq where order_submit_date='2016-05-01' limit 5;

Spark-sql如何创建外部分区表

以上是“Spark-sql如何创建外部分区表”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注创新互联行业资讯频道!


当前名称:Spark-sql如何创建外部分区表
转载源于:http://cqcxhl.cn/article/gdjeii.html

其他资讯

在线咨询
服务热线
服务热线:028-86922220
TOP