spark2_dfanalysis

Dataframe analysis in PySpark

View on GitHub

<!DOCTYPE html>

udf_date_time
In [166]:
# My Standard Spark Session!

# Python libraries:
import os
import sys
import re
from dateutil import parser
# import datetime
from datetime import datetime
from datetime import date
import builtins
import json
import functools
import operator
from itertools import product

# Numpy & Pandas!
import numpy as np
import pandas as pd
pd.options.display.float_format = '{:20,.2f}'.format
pd.options.display.max_columns = None
pd.options.display.max_colwidth = 80
#('display.max_colwidth', 80)


# Spark!
from pyspark import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *
from pyspark.sql import SparkSession, Row


spark = SparkSession.builder.appName("myapp").getOrCreate()

#     spark = SparkSession.builder.master("yarn")\
#     .config("spark.executor.instances", "32")\
#     .config("spark.executor.cores", "4")\
#     .config("spark.executor.memory", "4G")\
#     .config("spark.driver.memory", "4G")\
#     .config("spark.executor.memoryOverhead","4G")\
#     .config("spark.yarn.queue","Medium")\
#     .appName("myapp")\
#     .getOrCreate()

sc = spark.sparkContext
spark.conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
spark.conf.set("spark.debug.maxToStringFields","true")
In [2]:
%load_ext autoreload
%autoreload 2
# The autoreload extension is already loaded. To reload it, use:
#  %reload_ext autoreload


# mylib:
my_library = os.path.expanduser('~/.myconfigs')
my_spark = os.path.expanduser('~/spark2_dfanalysis')
sys.path.append(my_library)
sys.path.append(my_spark)


from shared.app_context import *
from builder.DataFrameBuild import *

ctx = ApplicationContext("Dev-Job")

DFB = DataFrameBuild(ctx.spark)

print("%16s  %s" % ("Python Version:",sys.version))
print("%16s  %s" % ("Python Path:",os.path.dirname(sys.executable)))
print("%16s  %s" % ("My Python Libs:",my_library))
print("%16s  %s" % ("My Spark Dir:",my_spark))
print("%16s  %s" % ("My Spark Ctx:",ctx.spark))
# print(ctx.spark)
# print(os.listdir(my_spark))
# print(sys.path)
# print("\n")
 Python Version:  3.6.1 |Anaconda 4.4.0 (64-bit)| (default, May 11 2017, 13:25:24) [MSC v.1900 64 bit (AMD64)]
    Python Path:  C:\Users\d810216\AppData\Local\conda\conda\envs\my_root
 My Python Libs:  C:\Users\d810216/.myconfigs
   My Spark Dir:  C:\Users\d810216/spark2_dfanalysis
   My Spark Ctx:  <pyspark.sql.session.SparkSession object at 0x00000283977B43C8>

Here's our sample dataframe:

df1: has 25000 cars sold, with a division, month, and price listed

df2: has the 25000 cars, with just the make

Let's join them, and then do some UDF's (user defined functions on them)

In [127]:
num = 25000

df1 = DFB.arrays_to_dataframe(
    [[int(x) for x in np.linspace(1,num,num)],
     DFB.build_array("string",num=num,width=3),
     DFB.build_array("integer",num=num,nrange=(100,999)),
     DFB.build_array("integer",num=num,nrange=(1,12)),
     DFB.build_array("integer",num=num,nrange=(1,28)),
     DFB.build_array("integer",num=num,nrange=(2010,2019)),
     DFB.build_array("integer",num=num,nrange=(0,23)),
     DFB.build_array("integer",num=num,nrange=(0,59)),
     DFB.build_array("integer",num=num,nrange=(0,59)),
     DFB.build_array("double",num=num,nrange=(1000,100000))],
    ['index','vin','division','month','day','year','hour','minute','second','price'])


lst_cars = [random.choice(['Honda','Toyota','Chevy','Ford','Tesla','Volkswagon','Hyundai','Jeep']) for x in range(num)]


df2 = DFB.arrays_to_dataframe(
    [[int(x) for x in np.linspace(1,num,num)],
    lst_cars],
    ['index','make'])

join!

In [128]:
df_sales = df1.alias("a")\
.join(df2.alias("b"),col("a.index") == col("b.index"),"inner")\
.drop(col("b.index"))
In [129]:
print(df_sales.count())
25000
In [130]:
df_sales.limit(8).toPandas()
Out[130]:
index vin division month day year hour minute second price make
0 26 nab 105 10 15 2010 13 51 20 15,714.75 Jeep
1 29 tpo 862 11 15 2013 21 47 5 77,844.98 Ford
2 474 dow 864 5 8 2012 19 40 42 34,483.40 Tesla
3 964 gxa 913 10 12 2014 14 16 4 3,115.38 Jeep
4 1677 lwp 625 5 22 2011 8 1 39 61,863.09 Volkswagon
5 1697 wvs 726 3 27 2019 12 8 32 65,485.91 Tesla
6 1806 zlp 516 5 24 2018 17 40 4 97,498.34 Hyundai
7 1950 gjk 570 1 24 2013 1 4 21 50,618.46 Volkswagon

Create a unique identifier column

In [131]:
unique_id = [col('make'),lit('_'),col('vin'), lit('_'), col('year')]

df_id = df_sales.withColumn('key',concat(*unique_id))
In [132]:
df_id.limit(8).toPandas()
Out[132]:
index vin division month day year hour minute second price make key
0 26 nab 105 10 15 2010 13 51 20 15,714.75 Jeep Jeep_nab_2010
1 29 tpo 862 11 15 2013 21 47 5 77,844.98 Ford Ford_tpo_2013
2 474 dow 864 5 8 2012 19 40 42 34,483.40 Tesla Tesla_dow_2012
3 964 gxa 913 10 12 2014 14 16 4 3,115.38 Jeep Jeep_gxa_2014
4 1677 lwp 625 5 22 2011 8 1 39 61,863.09 Volkswagon Volkswagon_lwp_2011
5 1697 wvs 726 3 27 2019 12 8 32 65,485.91 Tesla Tesla_wvs_2019
6 1806 zlp 516 5 24 2018 17 40 4 97,498.34 Hyundai Hyundai_zlp_2018
7 1950 gjk 570 1 24 2013 1 4 21 50,618.46 Volkswagon Volkswagon_gjk_2013

Create some Date columns

For this, I plan to use some UDF's (user defined functions)

In [205]:
# UDF's with lambdas!


# functions:
def leading_zeroes(x,width=2):
    return str(x).zfill(width)

def nohyphens(z):
    return re.sub("-","",str(z))

def date_from_str_mdY(z):
    return datetime.strptime(z,'%m-%d-%Y')

def datetime_from_str_mdY_time(z):
    return datetime.strptime(z,'%m-%d-%Y %H:%M:%S')
    # dt = datetime.strptime("21/11/06 16:30", "%d/%m/%y %H:%M")


# udf's:
udf_leading_zeroes2 = udf(lambda z: leading_zeroes(z,2),StringType())
udf_leading_zeroes4 = udf(lambda z: leading_zeroes(z,4),StringType())
udf_leading_zeroes6 = udf(lambda z: leading_zeroes(z,6),StringType())

# NB: the modified but dual use of "nohyphens"
udf_nohyphen_str = udf(lambda z: nohyphens(z),StringType())
udf_nohyphen_int = udf(lambda z: int(nohyphens(z)),IntegerType())

# date, datetime:
udf_date_from_str_mdY = udf(lambda z: date_from_str_mdY(z),DateType())
udf_datetime_from_str_mdY_time = udf(lambda z: datetime_from_str_mdY_time(z),TimestampType())
In [198]:
print(leading_zeroes('67',6))
000067

Create a DateType with UDF

All at once

In [182]:
# just add date column

df_finaldate = df_id\
.withColumn("month-2",udf_leading_zeroes2(col("month")))\
.withColumn("day-2",udf_leading_zeroes2(col("day")))\
.withColumn("year-4",udf_leading_zeroes4(col("year")))\
.withColumn("date-hyphen",concat("month-2",lit('-'),"day-2",lit('-'),"year-4"))\
.withColumn("date",udf_date_from_str_mdY(col("date-hyphen")))\
.drop("month-2","day-2","year-4","date-hyphen")
In [183]:
# df_finaldate.printSchema()
df_finaldate.limit(4).toPandas()
Out[183]:
index vin division month day year hour minute second price make key date
0 26 nab 105 10 15 2010 13 51 20 15,714.75 Jeep Jeep_nab_2010 2010-10-15
1 29 tpo 862 11 15 2013 21 47 5 77,844.98 Ford Ford_tpo_2013 2013-11-15
2 474 dow 864 5 8 2012 19 40 42 34,483.40 Tesla Tesla_dow_2012 2012-05-08
3 964 gxa 913 10 12 2014 14 16 4 3,115.38 Jeep Jeep_gxa_2014 2014-10-12

Step by step

In [184]:
# let's convert month-day-year into 08-24-2012 into a new column as a string, integer, and date
df_dated1 = df_id\
.withColumn("month-2",udf_leading_zeroes2(col("month")))\
.withColumn("day-2",udf_leading_zeroes2(col("day")))\
.withColumn("year-4",udf_leading_zeroes4(col("year")))\
.withColumn("year-6",udf_leading_zeroes6(col("year")))
In [185]:
df_dated1.limit(4).toPandas()
Out[185]:
index vin division month day year hour minute second price make key month-2 day-2 year-4 year-6
0 26 nab 105 10 15 2010 13 51 20 15,714.75 Jeep Jeep_nab_2010 10 15 2010 002010
1 29 tpo 862 11 15 2013 21 47 5 77,844.98 Ford Ford_tpo_2013 11 15 2013 002013
2 474 dow 864 5 8 2012 19 40 42 34,483.40 Tesla Tesla_dow_2012 05 08 2012 002012
3 964 gxa 913 10 12 2014 14 16 4 3,115.38 Jeep Jeep_gxa_2014 10 12 2014 002014
In [186]:
df_dated2 = df_id\
.withColumn("month-2",udf_leading_zeroes2(col("month")))\
.withColumn("day-2",udf_leading_zeroes2(col("day")))\
.withColumn("year-4",udf_leading_zeroes4(col("year")))\
.withColumn("date-hyphen",concat("month-2",lit('-'),"day-2",lit('-'),"year-4"))\
.drop("month-2","day-2","year-4")
In [187]:
df_dated2.limit(4).toPandas()
Out[187]:
index vin division month day year hour minute second price make key date-hyphen
0 26 nab 105 10 15 2010 13 51 20 15,714.75 Jeep Jeep_nab_2010 10-15-2010
1 29 tpo 862 11 15 2013 21 47 5 77,844.98 Ford Ford_tpo_2013 11-15-2013
2 474 dow 864 5 8 2012 19 40 42 34,483.40 Tesla Tesla_dow_2012 05-08-2012
3 964 gxa 913 10 12 2014 14 16 4 3,115.38 Jeep Jeep_gxa_2014 10-12-2014
In [188]:
# let's remove the hyphens now, and get a string, or integer back
df_nohyphen = df_dated2\
.withColumn("int-date",udf_nohyphen_int(col("date-hyphen")))\
.withColumn("str-date",udf_nohyphen_str(col("date-hyphen")))\
.withColumn("date",udf_date_from_str_mdY(col("date-hyphen")))
In [189]:
df_nohyphen.printSchema()
root
 |-- index: long (nullable = true)
 |-- vin: string (nullable = true)
 |-- division: long (nullable = true)
 |-- month: long (nullable = true)
 |-- day: long (nullable = true)
 |-- year: long (nullable = true)
 |-- hour: long (nullable = true)
 |-- minute: long (nullable = true)
 |-- second: long (nullable = true)
 |-- price: double (nullable = true)
 |-- make: string (nullable = true)
 |-- key: string (nullable = true)
 |-- date-hyphen: string (nullable = true)
 |-- int-date: integer (nullable = true)
 |-- str-date: string (nullable = true)
 |-- date: date (nullable = true)

In [190]:
df_nohyphen.limit(4).toPandas()
Out[190]:
index vin division month day year hour minute second price make key date-hyphen int-date str-date date
0 26 nab 105 10 15 2010 13 51 20 15,714.75 Jeep Jeep_nab_2010 10-15-2010 10152010 10152010 2010-10-15
1 29 tpo 862 11 15 2013 21 47 5 77,844.98 Ford Ford_tpo_2013 11-15-2013 11152013 11152013 2013-11-15
2 474 dow 864 5 8 2012 19 40 42 34,483.40 Tesla Tesla_dow_2012 05-08-2012 5082012 05082012 2012-05-08
3 964 gxa 913 10 12 2014 14 16 4 3,115.38 Jeep Jeep_gxa_2014 10-12-2014 10122014 10122014 2014-10-12
In [199]:
# let's remove the hyphens now, and get a string, or integer back
df_date = df_dated2\
.withColumn("int-date",udf_nohyphen_int(col("date-hyphen")))\
.withColumn("str-date",udf_nohyphen_str(col("date-hyphen")))\
.withColumn("date",udf_date_from_str_mdY(col("date-hyphen")))\
.drop("int-date","str-date","date-hyphen")
In [200]:
df_date.limit(4).toPandas()
Out[200]:
index vin division month day year hour minute second price make key date
0 26 nab 105 10 15 2010 13 51 20 15,714.75 Jeep Jeep_nab_2010 2010-10-15
1 29 tpo 862 11 15 2013 21 47 5 77,844.98 Ford Ford_tpo_2013 2013-11-15
2 474 dow 864 5 8 2012 19 40 42 34,483.40 Tesla Tesla_dow_2012 2012-05-08
3 964 gxa 913 10 12 2014 14 16 4 3,115.38 Jeep Jeep_gxa_2014 2014-10-12
In [201]:
df_date.printSchema()
root
 |-- index: long (nullable = true)
 |-- vin: string (nullable = true)
 |-- division: long (nullable = true)
 |-- month: long (nullable = true)
 |-- day: long (nullable = true)
 |-- year: long (nullable = true)
 |-- hour: long (nullable = true)
 |-- minute: long (nullable = true)
 |-- second: long (nullable = true)
 |-- price: double (nullable = true)
 |-- make: string (nullable = true)
 |-- key: string (nullable = true)
 |-- date: date (nullable = true)

Create TimestampType with UDF

In [215]:
# just add timestamp column

df_finaldatetime = df_id\
.withColumn("month-2",udf_leading_zeroes2(col("month")))\
.withColumn("day-2",udf_leading_zeroes2(col("day")))\
.withColumn("year-4",udf_leading_zeroes4(col("year")))\
.withColumn("hour-2",udf_leading_zeroes2(col("hour")))\
.withColumn("minute-2",udf_leading_zeroes2(col("minute")))\
.withColumn("second-2",udf_leading_zeroes2(col("second")))\
.withColumn("datetime-hyphen",concat("month-2",lit('-'),"day-2",lit('-'),"year-4",
                                     lit(" "),"hour-2",lit(":"),"minute-2",lit(":"),"second-2"))\
.withColumn("datetime",udf_datetime_from_str_mdY_time(col("datetime-hyphen")))\
.drop("month-2","day-2","year-4","datetime-hyphen","hour-2","minute-2","second-2")
In [216]:
df_finaldatetime.printSchema()
root
 |-- index: long (nullable = true)
 |-- vin: string (nullable = true)
 |-- division: long (nullable = true)
 |-- month: long (nullable = true)
 |-- day: long (nullable = true)
 |-- year: long (nullable = true)
 |-- hour: long (nullable = true)
 |-- minute: long (nullable = true)
 |-- second: long (nullable = true)
 |-- price: double (nullable = true)
 |-- make: string (nullable = true)
 |-- key: string (nullable = true)
 |-- datetime: timestamp (nullable = true)

In [217]:
df_finaldatetime.limit(4).toPandas()
Out[217]:
index vin division month day year hour minute second price make key datetime
0 26 nab 105 10 15 2010 13 51 20 15,714.75 Jeep Jeep_nab_2010 2010-10-15 13:51:20
1 29 tpo 862 11 15 2013 21 47 5 77,844.98 Ford Ford_tpo_2013 2013-11-15 21:47:05
2 474 dow 864 5 8 2012 19 40 42 34,483.40 Tesla Tesla_dow_2012 2012-05-08 19:40:42
3 964 gxa 913 10 12 2014 14 16 4 3,115.38 Jeep Jeep_gxa_2014 2014-10-12 14:16:04
In [ ]: