重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
这篇文章主要为大家展示了“Spark-sql如何创建外部分区表”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“Spark-sql如何创建外部分区表”这篇文章吧。
创新互联是专业的绵竹网站建设公司,绵竹接单;提供网站建设、网站制作,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行绵竹网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!
一、Spark-sql创建外部分区表
1.使用spark-sql
spark-sql --queue spark --master yarn --deploy-mode client --num-executors 10 --executor-cores 2 --executor-memory 3G
2.spark-sql中创建parquet分区表:
create external table pgls.convert_parq( bill_num string, logis_id string, store_id string, store_code string, creater_id string, order_status INT, pay_status INT, order_require_varieties INT, order_require_amount decimal(19,4), order_rec_amount decimal(19,4), order_rec_gpf decimal(19,4), deli_fee FLOAT, order_type INT, last_modify_time timestamp, order_submit_time timestamp ) partitioned by(order_submit_date date) row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' stored as parquetfile location '/test/spark/convert/parquet/bill_parq/';
二、CSV转Parquet
代码:org.apache.spark.ConvertToParquet.scala
package org.apache.spark import com.ecfront.fs.operation.HDFSOperation import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types._ /** * CSV 转换为 parquet * 参数:输入路径, 输出路径, 分区数 */ object ConvertToParquet{ def main(args: Array[String]) { if(args.length != 3){ println("jar args: inputFiles outPath numpartitions") System.exit(0) } val inputPath = args(0) val outPath = args(1) val numPartitions = args(2).toInt println("==========================================") println("=========input: "+ inputPath ) println("=========output: "+ outPath ) println("==numPartitions: "+ numPartitions ) println("==========================================") //判断输出目录是否存在,存在则删除 val fo = HDFSOperation(new Configuration()) val existDir = fo.existDir(outPath) if(existDir) { println("HDFS exists outpath: " + outPath) println("start to delete ...") val isDelete = fo.deleteDir(outPath) if(isDelete){ println(outPath +" delete done. ") } } val conf = new SparkConf() val sc = new SparkContext(conf) //参数SparkConf创建SparkContext, val sqlContext = new SQLContext(sc) //参数SparkContext创建SQLContext val schema = StructType(Array( StructField("bill_num",DataTypes.StringType,false), StructField("logis_id",DataTypes.StringType,false), StructField("store_id",DataTypes.StringType,false), StructField("store_code",DataTypes.StringType,false), StructField("creater_id",DataTypes.StringType,false), StructField("order_status",DataTypes.IntegerType,false), StructField("pay_status",DataTypes.IntegerType,false), StructField("order_require_varieties",DataTypes.IntegerType,false), StructField("order_require_amount",DataTypes.createDecimalType(19,4),false), StructField("order_rec_amount",DataTypes.createDecimalType(19,4),false), StructField("order_rec_gpf",DataTypes.createDecimalType(19,4),false), StructField("deli_fee",DataTypes.FloatType,false), StructField("order_type",DataTypes.IntegerType,false), StructField("last_modify_time",DataTypes.TimestampType,false), StructField("order_submit_time",DataTypes.TimestampType,false), StructField("order_submit_date",DataTypes.DateType,false))) convert(sqlContext, inputPath, schema, outPath, numPartitions) } //CSV转换为parquet def convert(sqlContext: SQLContext, inputpath: String, schema: StructType, outpath: String, numPartitions: Int) { // 将text导入到DataFrame val df = sqlContext.read.format("com.databricks.spark.csv"). schema(schema).option("delimiter", ",").load(inputpath) // 转换为parquet // df.write.parquet(outpath) // 转换时以block数为分区数 df.coalesce(numPartitions).write.parquet(outpath) //自定义分区数 } }
打包后jar上传至本地目录: /soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar 事先在HDFS上生成CSV文件,HDFS目录: /test/spark/convert/data/order/2016-05-01/ 执行命令:
spark-submit --queue spark --master yarn --num-executors 10 --executor-cores 2 --executor-memory 3G --class org.apache.spark.ConvertToParquet --name ConvertToParquet file:/soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar /test/spark/convert/data/order/2016-05-01/ /test/spark/convert/parquet/bill_parq/order_submit_date=2016-05-01
pom.xml相关内容:
1.依赖包:
com.ecfront ez-fs 0.9 org.apache.spark spark-core_2.10 1.6.1 org.apache.spark spark-sql_2.10 1.6.1 com.databricks spark-csv_2.11 1.4.0 org.apache.hadoop hadoop-client 2.6.0
2.plugins(含打入依赖包)
net.alchim31.maven scala-maven-plugin 3.2.1 org.apache.maven.plugins maven-compiler-plugin 2.0.2 org.apache.maven.plugins maven-shade-plugin 1.4 *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA net.alchim31.maven scala-maven-plugin scala-compile-first process-resources add-source compile scala-test-compile process-test-resources testCompile org.apache.maven.plugins maven-compiler-plugin compile compile org.apache.maven.plugins maven-shade-plugin 1.4 true package shade org.apache.spark.ConvertToParquet
三、表添加分区
spark-sql下执行
alter table pgls.convert_parq add partition(order_submit_date='2016-05-01');
可通过sql查询到相应数据:
select * from pgls.convert_parq where order_submit_date='2016-05-01' limit 5;
以上是“Spark-sql如何创建外部分区表”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注创新互联行业资讯频道!