• 2021/2/2 14:30:23
  • Post By 高景洋
# 提交方式(测试) # spark-submit --master local[2] --num-executors 2 --executor-memory 1G --jars ./spark-examples_2.11-1.6.0-typesafe-001.jar /home/hadoop/script/test_hbase_dataframe.py # 打包 # zip -r collect_py.zip * # ---------------提交方式(正式) : 适用于Python3.7 和 Spark2.3+------------------- # spark-submit \ # --name hbase_scan \ # --py-files /home/hadoop/collect_py/collect_py.zip \ #
查看全文 | 浏览次数(56)
  • 2021/1/29 18:37:25
  • Post By 高景洋
首先,我们要了解几个spark的内置时间比较函数: current_date:获取当前日期,示例:2021-1-29 date_sub:在指定日期上减N天,返回值是一个dataframe Row ,所以这个函数是在过滤时,对dataframe列进行操作 date_add:在指定日期上加N天 ,注意点同date_sub 下边的示例,功能为: 过滤出 UpdatedDate 是昨天的数据,且小于今天的日期 说明: 1、今天是 2021-1-29号,则取出 2021-1-28 - 2021-1-29 之间的数据 2、用WebsiteID 作为汇总字段,进行count操作 3、返回结果为 dataframe from pyspark.sql.functions import to_timestamp,current_date,date_sub
查看全文 | 浏览次数(49)
  • 2021/1/29 18:27:52
  • Post By 高景洋
直接上代码: from pyspark import SparkContext,SparkConf conf = SparkConf() sc = SparkContext(conf=conf) list_url_group_data = ListUrlDA().select_list_url_count_group_by_websiteid(list_schedule_website_id) #从mysql读出来的数据 类型 List list_url_rdd = sc.parallelize(list_url_group_data) # 将List转换为rdd spark = SparkSession.builder.master("local").appName("SparkMysql").getOrCreate() schem
查看全文 | 浏览次数(46)
  • 2021/1/28 11:38:26
  • Post By 高景洋
业务需求: 1、通过pyspark将hbase中的数据拉出 2、通过pyspark按UpdatedDate 、EnteredDate、DeletedDate ,根据WebsiteID字段汇总数量 小白方法: 1、spark拉出hbase数据 2、rdd1 = hbase_result_rdd.map(''对日期字段进行处理'') df1 = rdd_to_df(rdd1) # 将rdd转换为dataframe df2 = df1.filter(df1[''UpdatedDate'']>datetime.datetime.today().date) # 理想中的样子 现实中的样子:各种日期类型转换问题报错,如:数据中的字段值为None \ 2021-01-22 18:47:48 \ 2021-01-22T18:47:48 大白方法: 1、
查看全文 | 浏览次数(47)
  • 2021/1/13 16:30:50
  • Post By 高景洋
pyspark中要给rdd增加一列新数据,请看下边的代码。 from pyspark.sql import Row def add_field(row,refresh_date): tmp = row.asDict() tmp[''RefreshDate''] = refresh_date tmp[''RandomKey''] = uuid.uuid1().hex return Row(**tmp) if __name__ == ''__main__'': rdd_data= hbase_rdd.map(lambd m:add_field(m[1],refresh_date)).collect()
查看全文 | 浏览次数(85)
  • 2020/12/11 11:44:35
  • Post By 高景洋
from pyspark.sql.functions import lit,rand df = spark.read.csv(''file:///Users/jasongao/Documents/tmp/hbase-0.csv'',schema) # 原始DF df11 = df.withColumn(''RandomKey'',rand()) # 给df 增加 RandomKey 随机列,并生成新的DF df11.orderBy(df11[''RandomKey'']).show() # 按新增列正排序 输出显示 PS : 新增列时,优先使用 # from pyspark.sql.functions # 下的方法,尽量避免通过用户自定义函数实现列内容增加
查看全文 | 浏览次数(137)
  • 2020/12/4 13:12:59
  • Post By 高景洋
1、Hbase中数据列并不是统一的 2、如果在列不统一的情况下,将数据通过spark从hbase读出后,直接转dataframe会报错 3、操作dataframe的方便性比操作rdd好很多,因此我们需要想办法,把字段不统一的rdd转换为dataframe 具体逻辑请看以下代码,亲测可用: from pyspark import SparkContext,SparkConf from pyspark.sql import SparkSession import json def deal_missing_dec(no_row_key_colnames): def deal_missing(x): result = {} for i in no_row_key_colnames: if i in x[1].keys():
查看全文 | 浏览次数(132)
  • 2020/11/18 15:05:15
  • Post By 高景洋
查看全文 | 浏览次数(138)
  • 2020/11/13 15:33:10
  • Post By 高景洋
下边用示例代码,给大家做个演示,并针输出结果截图。 from pyspark import SparkContext,SparkConf from pyspark.sql.session import SparkSession from pyspark.sql.types import StructField, StructType, StringType if __name__ == ''__main__'': spark = SparkSession.builder.master("local").appName("SparkOnHive").getOrCreate()#.enableHiveSupport() schema = StructType([ # true代表不为空 StructField("WebsiteID", StringType()
查看全文 | 浏览次数(173)
  • 2020/11/13 15:14:39
  • Post By 高景洋
spark如何将rdd转换成dataframe? ***注意点***:RDD中的每条数据,一定要结构统一。不然会报以下错误: ValueError: Length of object (3) does not match with length of fields (4) 注意场景:我们在hbase里的数据结构不统一,如有些数据有 JobHistory 列,但是有的没有。所以,当我们把数据从Hbase读出来后,进行 toDF 操作,报错。 下边为正常的Rdd转dataframe 示例: from pyspark import SparkContext,SparkConf from pyspark.sql.session import SparkSession from pyspark.sql.types import StructField, St
查看全文 | 浏览次数(132)
相关文章
友情链接
支付宝打赏(内容对您有帮助的话)
微信交流
知乎编程经验网 - 在工作中总结编程的经验! 备案/许可证编号:鲁ICP备11020152号
QQ:120217215 联系电话:15192695151