Spark

  • 2020-11-13 15:14:40
  • Post By 高景洋
spark如何将rdd转换成dataframe? ***注意点***:RDD中的每条数据,一定要结构统一。不然会报以下错误: ValueError: Length of object (3) does not match with length of fields (4) 注意场景:我们在hbase里的数据结构不统一,如有些数据有 JobHistory 列,但是有的没有。所以,当我们把数据从Hbase读出来后,进行 toDF 操作,报错。 下边为正常的Rdd转dataframe 示例: from pyspark import SparkContext,SparkConf from pyspark.sql.session import SparkSession from pyspark.sql.types import StructField, St
查看全文 | 浏览次数(1639)
  • 2020-10-28 13:25:08
  • Post By 高景洋
pyspark连接hbase报org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter 1、在Spark 2.0版本上缺少相关把hbase的数据转换python可读取的jar包 2、下载jar包spark-examples_2.11-1.6.0-typesafe-001.jar 下载地址:https://mvnrepository.com/artifact/org.apache.spark/spark-examples_2.11/1.6.0-typesafe-001 3、解压文件,并将解压后的文件移至 /usr/local/spark3.0.1/examples/jars 目录下 4、执行 cd /usr/local/spark3.0.1/conf 目录 5、打开 sp
查看全文 | 浏览次数(1488)
  • 2020-10-25 14:20:58
  • Post By 高景洋
action常用算子: collect count take reduce saveAsTextFile foeach 示例代码: def my_action(): data = [1,2,3,4,5,6,7,8,9,10] rdd1 = sc.parallelize(data) print(rdd1.collect()) print(''最大值:{}''.format(rdd1.max())) print(''最小值:{}''.format(rdd1.min())) print(''求和:{}''.format(rdd1.sum())) print(''相邻的两个相加:{}''.format(rdd1.reduce(lambda a,b:a+b))) print(''元素总数:{}''.format(rdd1.count())) p
查看全文 | 浏览次数(1087)
  • 2020-10-25 14:07:52
  • Post By 高景洋
join: 关联查询 Spark 中的Rdd关联可以通过 4种方式 1、Join 2、leftOuterJoin 3、rightOuterJoin 4、fullOuterJoin 示例代码: def my_join(): rdd_a = sc.parallelize([(''A'',''a1''),(''C'',''c1''),(''D'',''d1''),(''F'',''f1''),(''F'',''f2'')]) rdd_b = sc.parallelize([(''A'',''a2''),(''C'',''c2''),(''C'',''c3''),(''E'',''e1'')]) result = rdd_a.join(rdd_b) print(result.collect()) result_left_join = rdd_a.left
查看全文 | 浏览次数(1750)
  • 2020-10-25 13:31:34
  • Post By 高景洋
distinct: Rdd数据去重 示例代码: def my_distinct(): data = [1,2,3,4,5] rdd1 =sc.parallelize(data) data2 = [2,3,4,9,10] rdd2 = sc.parallelize(data2) result = rdd1.union(rdd2).distinct() print(result.collect()) 输出结果: [10, 1, 2, 3, 4, 5, 9]
查看全文 | 浏览次数(1181)
  • 2020-10-25 13:26:51
  • Post By 高景洋
union: 合并两个RDD 示例代码: def my_union(): data = [1,2,3,4,5] rdd1 =sc.parallelize(data) data2 = [6,7,8,9,10] rdd2 = sc.parallelize(data2) result = rdd1.union(rdd2) print(result.collect()) 输出结果: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
查看全文 | 浏览次数(1417)
  • 2020-10-25 11:32:54
  • Post By 高景洋
sortByKey: 按Key进行排序 示例代码: def my_sortByKey(): data = [''hello spark'', ''hello world'', ''hello world''] rdd = sc.parallelize(data) map_rdd = rdd.flatMap(lambda line: line.split('' '')).map(lambda x: (x, 1)) reduce_by_key = map_rdd.reduceByKey(lambda a,b:a+b) sort_by_rdd = reduce_by_key.map(lambda x:(x[1],x[0])) print(sort_by_rdd.sortByKey().map(lambda x:(x[1],x[0])).collect()) 输出
查看全文 | 浏览次数(1275)
  • 2020-10-25 10:50:56
  • Post By 高景洋
reduceByKey: 按key 分组,并进行计算 示例代码: 统计RDD中,每个单词出现的次数: def my_reduceByKey(): data = [''hello spark'', ''hello world'', ''hello world''] rdd = sc.parallelize(data) map_rdd = rdd.flatMap(lambda line: line.split('' '')).map(lambda x: (x, 1)) reduce_by_key = map_rdd.reduceByKey(lambda a,b:a+b) print(reduce_by_key.collect()) 输出结果: [(''hello'', 3), (''spark'', 1), (''world'', 2)]
查看全文 | 浏览次数(1483)
  • 2020-10-25 10:39:47
  • Post By 高景洋
groupByKey(): 分组统计 示例代码: def my_groupByKey(): data = [''hello spark'', ''hello world'', ''hello world''] rdd = sc.parallelize(data) map_rdd = rdd.flatMap(lambda line:line.split('' '')).map(lambda x:(x,1)) group_by_rdd = map_rdd.groupByKey() print(group_by_rdd.collect()) result_rdd = group_by_rdd.map(lambda x:{x[0]:list(x[1])}) print(result_rdd.collect()) 输出结果: (''hello'', <pyspark.
查看全文 | 浏览次数(1407)
  • 2020-10-25 10:19:18
  • Post By 高景洋
flatmap(func): 1、输入的item能够map到0 或 多个输出,返回值是一个Sequence 示例代码: 实现rdd中的每个数据,按 空格 拆分,并输出 def my_flatMap(): data = [''hello spark'',''hello world'',''hello world''] rdd = sc.parallelize(data) rdd2 = rdd.flatMap(lambda line:line.split('' '')).collect() print(rdd2) 输出结果: [''hello'', ''spark'', ''hello'', ''world'', ''hello'', ''world'']
查看全文 | 浏览次数(1504)
  1. 1
  2. 2
  3. 3