<!DOCTYPE html>
In [166]:
# My Standard Spark Session!
# Python libraries:
import os
import sys
import re
from dateutil import parser
# import datetime
from datetime import datetime
from datetime import date
import builtins
import json
import functools
import operator
from itertools import product
# Numpy & Pandas!
import numpy as np
import pandas as pd
pd.options.display.float_format = '{:20,.2f}'.format
pd.options.display.max_columns = None
pd.options.display.max_colwidth = 80
#('display.max_colwidth', 80)
# Spark!
from pyspark import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *
from pyspark.sql import SparkSession, Row
spark = SparkSession.builder.appName("myapp").getOrCreate()
# spark = SparkSession.builder.master("yarn")\
# .config("spark.executor.instances", "32")\
# .config("spark.executor.cores", "4")\
# .config("spark.executor.memory", "4G")\
# .config("spark.driver.memory", "4G")\
# .config("spark.executor.memoryOverhead","4G")\
# .config("spark.yarn.queue","Medium")\
# .appName("myapp")\
# .getOrCreate()
sc = spark.sparkContext
spark.conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
spark.conf.set("spark.debug.maxToStringFields","true")
In [2]:
%load_ext autoreload
%autoreload 2
# The autoreload extension is already loaded. To reload it, use:
# %reload_ext autoreload
# mylib:
my_library = os.path.expanduser('~/.myconfigs')
my_spark = os.path.expanduser('~/spark2_dfanalysis')
sys.path.append(my_library)
sys.path.append(my_spark)
from shared.app_context import *
from builder.DataFrameBuild import *
ctx = ApplicationContext("Dev-Job")
DFB = DataFrameBuild(ctx.spark)
print("%16s %s" % ("Python Version:",sys.version))
print("%16s %s" % ("Python Path:",os.path.dirname(sys.executable)))
print("%16s %s" % ("My Python Libs:",my_library))
print("%16s %s" % ("My Spark Dir:",my_spark))
print("%16s %s" % ("My Spark Ctx:",ctx.spark))
# print(ctx.spark)
# print(os.listdir(my_spark))
# print(sys.path)
# print("\n")
In [127]:
num = 25000
df1 = DFB.arrays_to_dataframe(
[[int(x) for x in np.linspace(1,num,num)],
DFB.build_array("string",num=num,width=3),
DFB.build_array("integer",num=num,nrange=(100,999)),
DFB.build_array("integer",num=num,nrange=(1,12)),
DFB.build_array("integer",num=num,nrange=(1,28)),
DFB.build_array("integer",num=num,nrange=(2010,2019)),
DFB.build_array("integer",num=num,nrange=(0,23)),
DFB.build_array("integer",num=num,nrange=(0,59)),
DFB.build_array("integer",num=num,nrange=(0,59)),
DFB.build_array("double",num=num,nrange=(1000,100000))],
['index','vin','division','month','day','year','hour','minute','second','price'])
lst_cars = [random.choice(['Honda','Toyota','Chevy','Ford','Tesla','Volkswagon','Hyundai','Jeep']) for x in range(num)]
df2 = DFB.arrays_to_dataframe(
[[int(x) for x in np.linspace(1,num,num)],
lst_cars],
['index','make'])
join!¶
In [128]:
df_sales = df1.alias("a")\
.join(df2.alias("b"),col("a.index") == col("b.index"),"inner")\
.drop(col("b.index"))
In [129]:
print(df_sales.count())
In [130]:
df_sales.limit(8).toPandas()
Out[130]:
Create a unique identifier column¶
In [131]:
unique_id = [col('make'),lit('_'),col('vin'), lit('_'), col('year')]
df_id = df_sales.withColumn('key',concat(*unique_id))
In [132]:
df_id.limit(8).toPandas()
Out[132]:
In [205]:
# UDF's with lambdas!
# functions:
def leading_zeroes(x,width=2):
return str(x).zfill(width)
def nohyphens(z):
return re.sub("-","",str(z))
def date_from_str_mdY(z):
return datetime.strptime(z,'%m-%d-%Y')
def datetime_from_str_mdY_time(z):
return datetime.strptime(z,'%m-%d-%Y %H:%M:%S')
# dt = datetime.strptime("21/11/06 16:30", "%d/%m/%y %H:%M")
# udf's:
udf_leading_zeroes2 = udf(lambda z: leading_zeroes(z,2),StringType())
udf_leading_zeroes4 = udf(lambda z: leading_zeroes(z,4),StringType())
udf_leading_zeroes6 = udf(lambda z: leading_zeroes(z,6),StringType())
# NB: the modified but dual use of "nohyphens"
udf_nohyphen_str = udf(lambda z: nohyphens(z),StringType())
udf_nohyphen_int = udf(lambda z: int(nohyphens(z)),IntegerType())
# date, datetime:
udf_date_from_str_mdY = udf(lambda z: date_from_str_mdY(z),DateType())
udf_datetime_from_str_mdY_time = udf(lambda z: datetime_from_str_mdY_time(z),TimestampType())
In [198]:
print(leading_zeroes('67',6))
Create a DateType with UDF¶
All at once¶
In [182]:
# just add date column
df_finaldate = df_id\
.withColumn("month-2",udf_leading_zeroes2(col("month")))\
.withColumn("day-2",udf_leading_zeroes2(col("day")))\
.withColumn("year-4",udf_leading_zeroes4(col("year")))\
.withColumn("date-hyphen",concat("month-2",lit('-'),"day-2",lit('-'),"year-4"))\
.withColumn("date",udf_date_from_str_mdY(col("date-hyphen")))\
.drop("month-2","day-2","year-4","date-hyphen")
In [183]:
# df_finaldate.printSchema()
df_finaldate.limit(4).toPandas()
Out[183]:
Step by step¶
In [184]:
# let's convert month-day-year into 08-24-2012 into a new column as a string, integer, and date
df_dated1 = df_id\
.withColumn("month-2",udf_leading_zeroes2(col("month")))\
.withColumn("day-2",udf_leading_zeroes2(col("day")))\
.withColumn("year-4",udf_leading_zeroes4(col("year")))\
.withColumn("year-6",udf_leading_zeroes6(col("year")))
In [185]:
df_dated1.limit(4).toPandas()
Out[185]:
In [186]:
df_dated2 = df_id\
.withColumn("month-2",udf_leading_zeroes2(col("month")))\
.withColumn("day-2",udf_leading_zeroes2(col("day")))\
.withColumn("year-4",udf_leading_zeroes4(col("year")))\
.withColumn("date-hyphen",concat("month-2",lit('-'),"day-2",lit('-'),"year-4"))\
.drop("month-2","day-2","year-4")
In [187]:
df_dated2.limit(4).toPandas()
Out[187]:
In [188]:
# let's remove the hyphens now, and get a string, or integer back
df_nohyphen = df_dated2\
.withColumn("int-date",udf_nohyphen_int(col("date-hyphen")))\
.withColumn("str-date",udf_nohyphen_str(col("date-hyphen")))\
.withColumn("date",udf_date_from_str_mdY(col("date-hyphen")))
In [189]:
df_nohyphen.printSchema()
In [190]:
df_nohyphen.limit(4).toPandas()
Out[190]:
In [199]:
# let's remove the hyphens now, and get a string, or integer back
df_date = df_dated2\
.withColumn("int-date",udf_nohyphen_int(col("date-hyphen")))\
.withColumn("str-date",udf_nohyphen_str(col("date-hyphen")))\
.withColumn("date",udf_date_from_str_mdY(col("date-hyphen")))\
.drop("int-date","str-date","date-hyphen")
In [200]:
df_date.limit(4).toPandas()
Out[200]:
In [201]:
df_date.printSchema()
Create TimestampType with UDF¶
In [215]:
# just add timestamp column
df_finaldatetime = df_id\
.withColumn("month-2",udf_leading_zeroes2(col("month")))\
.withColumn("day-2",udf_leading_zeroes2(col("day")))\
.withColumn("year-4",udf_leading_zeroes4(col("year")))\
.withColumn("hour-2",udf_leading_zeroes2(col("hour")))\
.withColumn("minute-2",udf_leading_zeroes2(col("minute")))\
.withColumn("second-2",udf_leading_zeroes2(col("second")))\
.withColumn("datetime-hyphen",concat("month-2",lit('-'),"day-2",lit('-'),"year-4",
lit(" "),"hour-2",lit(":"),"minute-2",lit(":"),"second-2"))\
.withColumn("datetime",udf_datetime_from_str_mdY_time(col("datetime-hyphen")))\
.drop("month-2","day-2","year-4","datetime-hyphen","hour-2","minute-2","second-2")
In [216]:
df_finaldatetime.printSchema()
In [217]:
df_finaldatetime.limit(4).toPandas()
Out[217]:
In [ ]: