• 2020/11/18 15:05:15
  • Post By 高景洋
查看全文 | 浏览次数(11)
  • 2020/11/13 15:33:10
  • Post By 高景洋
下边用示例代码,给大家做个演示,并针输出结果截图。 from pyspark import SparkContext,SparkConf from pyspark.sql.session import SparkSession from pyspark.sql.types import StructField, StructType, StringType if __name__ == ''__main__'': spark = SparkSession.builder.master("local").appName("SparkOnHive").getOrCreate()#.enableHiveSupport() schema = StructType([ # true代表不为空 StructField("WebsiteID", StringType()
查看全文 | 浏览次数(28)
  • 2020/11/13 15:14:39
  • Post By 高景洋
spark如何将rdd转换成dataframe? ***注意点***:RDD中的每条数据,一定要结构统一。不然会报以下错误: ValueError: Length of object (3) does not match with length of fields (4) 注意场景:我们在hbase里的数据结构不统一,如有些数据有 JobHistory 列,但是有的没有。所以,当我们把数据从Hbase读出来后,进行 toDF 操作,报错。 下边为正常的Rdd转dataframe 示例: from pyspark import SparkContext,SparkConf from pyspark.sql.session import SparkSession from pyspark.sql.types import StructField, St
查看全文 | 浏览次数(25)
  • 2020/10/28 13:25:08
  • Post By 高景洋
pyspark连接hbase报org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter 1、在Spark 2.0版本上缺少相关把hbase的数据转换python可读取的jar包 2、下载jar包spark-examples_2.11-1.6.0-typesafe-001.jar 下载地址:https://mvnrepository.com/artifact/org.apache.spark/spark-examples_2.11/1.6.0-typesafe-001 3、解压文件,并将解压后的文件移至 /usr/local/spark3.0.1/examples/jars 目录下 4、执行 cd /usr/local/spark3.0.1/conf 目录 5、打开 sp
查看全文 | 浏览次数(54)
  • 2020/10/25 14:20:58
  • Post By 高景洋
action常用算子: collect count take reduce saveAsTextFile foeach 示例代码: def my_action(): data = [1,2,3,4,5,6,7,8,9,10] rdd1 = sc.parallelize(data) print(rdd1.collect()) print(''最大值:{}''.format(rdd1.max())) print(''最小值:{}''.format(rdd1.min())) print(''求和:{}''.format(rdd1.sum())) print(''相邻的两个相加:{}''.format(rdd1.reduce(lambda a,b:a+b))) print(''元素总数:{}''.format(rdd1.count())) p
查看全文 | 浏览次数(73)
  • 2020/10/25 14:07:52
  • Post By 高景洋
join: 关联查询 Spark 中的Rdd关联可以通过 4种方式 1、Join 2、leftOuterJoin 3、rightOuterJoin 4、fullOuterJoin 示例代码: def my_join(): rdd_a = sc.parallelize([(''A'',''a1''),(''C'',''c1''),(''D'',''d1''),(''F'',''f1''),(''F'',''f2'')]) rdd_b = sc.parallelize([(''A'',''a2''),(''C'',''c2''),(''C'',''c3''),(''E'',''e1'')]) result = rdd_a.join(rdd_b) print(result.collect()) result_left_join = rdd_a.left
查看全文 | 浏览次数(67)
  • 2020/10/25 13:31:34
  • Post By 高景洋
distinct: Rdd数据去重 示例代码: def my_distinct(): data = [1,2,3,4,5] rdd1 =sc.parallelize(data) data2 = [2,3,4,9,10] rdd2 = sc.parallelize(data2) result = rdd1.union(rdd2).distinct() print(result.collect()) 输出结果: [10, 1, 2, 3, 4, 5, 9]
查看全文 | 浏览次数(75)
  • 2020/10/25 13:26:51
  • Post By 高景洋
union: 合并两个RDD 示例代码: def my_union(): data = [1,2,3,4,5] rdd1 =sc.parallelize(data) data2 = [6,7,8,9,10] rdd2 = sc.parallelize(data2) result = rdd1.union(rdd2) print(result.collect()) 输出结果: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
查看全文 | 浏览次数(71)
  • 2020/10/25 11:32:54
  • Post By 高景洋
sortByKey: 按Key进行排序 示例代码: def my_sortByKey(): data = [''hello spark'', ''hello world'', ''hello world''] rdd = sc.parallelize(data) map_rdd = rdd.flatMap(lambda line: line.split('' '')).map(lambda x: (x, 1)) reduce_by_key = map_rdd.reduceByKey(lambda a,b:a+b) sort_by_rdd = reduce_by_key.map(lambda x:(x[1],x[0])) print(sort_by_rdd.sortByKey().map(lambda x:(x[1],x[0])).collect()) 输出
查看全文 | 浏览次数(85)
  • 2020/10/25 10:50:56
  • Post By 高景洋
reduceByKey: 按key 分组,并进行计算 示例代码: 统计RDD中,每个单词出现的次数: def my_reduceByKey(): data = [''hello spark'', ''hello world'', ''hello world''] rdd = sc.parallelize(data) map_rdd = rdd.flatMap(lambda line: line.split('' '')).map(lambda x: (x, 1)) reduce_by_key = map_rdd.reduceByKey(lambda a,b:a+b) print(reduce_by_key.collect()) 输出结果: [(''hello'', 3), (''spark'', 1), (''world'', 2)]
查看全文 | 浏览次数(63)
相关文章
友情链接
支付宝打赏(内容对您有帮助的话)
微信交流
知乎编程经验网 - 在工作中总结编程的经验! 备案/许可证编号:鲁ICP备11020152号
QQ:120217215 联系电话:15192695151