Spark

  • 2021-07-22 15:02:30
  • Post By 高景洋
要通过 pyspark 将dataframe中的某一列值转换为小写,需要如何操作? ps: 要通过pyspark 对dataframe 中的值做类型转换 或 值处理,不要去想python的处理方法,spark中有自己数据类型,也有自己的值处理方法。
查看全文 | 浏览次数(2189)
  • 2021-02-02 14:30:23
  • Post By 高景洋
# 提交方式(测试) # spark-submit --master local[2] --num-executors 2 --executor-memory 1G --jars ./spark-examples_2.11-1.6.0-typesafe-001.jar /home/hadoop/script/test_hbase_dataframe.py # 打包 # zip -r collect_py.zip * # ---------------提交方式(正式) : 适用于Python3.7 和 Spark2.3+------------------- # spark-submit \ # --name hbase_scan \ # --py-files /home/hadoop/collect_py/collect_py.zip \ #
查看全文 | 浏览次数(2017)
  • 2021-01-29 18:37:26
  • Post By 高景洋
首先,我们要了解几个spark的内置时间比较函数: current_date:获取当前日期,示例:2021-1-29 date_sub:在指定日期上减N天,返回值是一个dataframe Row ,所以这个函数是在过滤时,对dataframe列进行操作 date_add:在指定日期上加N天 ,注意点同date_sub 下边的示例,功能为: 过滤出 UpdatedDate 是昨天的数据,且小于今天的日期 说明: 1、今天是 2021-1-29号,则取出 2021-1-28 - 2021-1-29 之间的数据 2、用WebsiteID 作为汇总字段,进行count操作 3、返回结果为 dataframe from pyspark.sql.functions import to_timestamp,current_date,date_sub
查看全文 | 浏览次数(2153)
  • 2021-01-29 18:27:52
  • Post By 高景洋
直接上代码: from pyspark import SparkContext,SparkConf conf = SparkConf() sc = SparkContext(conf=conf) list_url_group_data = ListUrlDA().select_list_url_count_group_by_websiteid(list_schedule_website_id) #从mysql读出来的数据 类型 List list_url_rdd = sc.parallelize(list_url_group_data) # 将List转换为rdd spark = SparkSession.builder.master("local").appName("SparkMysql").getOrCreate() schem
查看全文 | 浏览次数(1245)
  • 2021-01-28 11:38:26
  • Post By 高景洋
业务需求: 1、通过pyspark将hbase中的数据拉出 2、通过pyspark按UpdatedDate 、EnteredDate、DeletedDate ,根据WebsiteID字段汇总数量 小白方法: 1、spark拉出hbase数据 2、rdd1 = hbase_result_rdd.map(''对日期字段进行处理'') df1 = rdd_to_df(rdd1) # 将rdd转换为dataframe df2 = df1.filter(df1[''UpdatedDate'']>datetime.datetime.today().date) # 理想中的样子 现实中的样子:各种日期类型转换问题报错,如:数据中的字段值为None \ 2021-01-22 18:47:48 \ 2021-01-22T18:47:48 大白方法: 1、
查看全文 | 浏览次数(1587)
  • 2021-01-13 16:30:50
  • Post By 高景洋
pyspark中要给rdd增加一列新数据,请看下边的代码。 from pyspark.sql import Row def add_field(row,refresh_date): tmp = row.asDict() tmp[''RefreshDate''] = refresh_date tmp[''RandomKey''] = uuid.uuid1().hex return Row(**tmp) if __name__ == ''__main__'': rdd_data= hbase_rdd.map(lambd m:add_field(m[1],refresh_date)).collect()
查看全文 | 浏览次数(3492)
  • 2020-12-11 11:44:35
  • Post By 高景洋
from pyspark.sql.functions import lit,rand df = spark.read.csv(''file:///Users/jasongao/Documents/tmp/hbase-0.csv'',schema) # 原始DF df11 = df.withColumn(''RandomKey'',rand()) # 给df 增加 RandomKey 随机列,并生成新的DF df11.orderBy(df11[''RandomKey'']).show() # 按新增列正排序 输出显示 PS : 新增列时,优先使用 # from pyspark.sql.functions # 下的方法,尽量避免通过用户自定义函数实现列内容增加
查看全文 | 浏览次数(1297)
  • 2020-12-04 13:13:00
  • Post By 高景洋
1、Hbase中数据列并不是统一的 2、如果在列不统一的情况下,将数据通过spark从hbase读出后,直接转dataframe会报错 3、操作dataframe的方便性比操作rdd好很多,因此我们需要想办法,把字段不统一的rdd转换为dataframe 具体逻辑请看以下代码,亲测可用: from pyspark import SparkContext,SparkConf from pyspark.sql import SparkSession import json def deal_missing_dec(no_row_key_colnames): def deal_missing(x): result = {} for i in no_row_key_colnames: if i in x[1].keys():
查看全文 | 浏览次数(1890)
  • 2020-11-18 15:05:15
  • Post By 高景洋
查看全文 | 浏览次数(1108)
  • 2020-11-13 15:33:11
  • Post By 高景洋
下边用示例代码,给大家做个演示,并针输出结果截图。 from pyspark import SparkContext,SparkConf from pyspark.sql.session import SparkSession from pyspark.sql.types import StructField, StructType, StringType if __name__ == ''__main__'': spark = SparkSession.builder.master("local").appName("SparkOnHive").getOrCreate()#.enableHiveSupport() schema = StructType([ # true代表不为空 StructField("WebsiteID", StringType()
查看全文 | 浏览次数(1472)
  1. 1
  2. 2
  3. 3