<!DOCTYPE html>
In [1]:
# My Standard Spark Session!
# Python libraries:
import os
import sys
import re
from dateutil import parser
# import datetime
from datetime import datetime
from datetime import date
import builtins
import json
import functools
import operator
from itertools import product
# Numpy & Pandas!
import numpy as np
import pandas as pd
pd.options.display.float_format = '{:20,.2f}'.format
pd.options.display.max_columns = None
# Spark!
from pyspark import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *
from pyspark.sql import SparkSession, Row
spark = SparkSession.builder.appName("myapp").getOrCreate()
# spark = SparkSession.builder.master("yarn")\
# .config("spark.executor.instances", "32")\
# .config("spark.executor.cores", "4")\
# .config("spark.executor.memory", "4G")\
# .config("spark.driver.memory", "4G")\
# .config("spark.executor.memoryOverhead","4G")\
# .config("spark.yarn.queue","Medium")\
# .appName("myapp")\
# .getOrCreate()
sc = spark.sparkContext
spark.conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
spark.conf.set("spark.debug.maxToStringFields","true")
In [2]:
%load_ext autoreload
%autoreload 2
# The autoreload extension is already loaded. To reload it, use:
# %reload_ext autoreload
# mylib:
my_library = os.path.expanduser('~/.myconfigs')
my_spark = os.path.expanduser('~/spark2_dfanalysis')
sys.path.append(my_library)
sys.path.append(my_spark)
from shared.app_context import *
from builder.DataFrameBuild import *
ctx = ApplicationContext("Dev-Job")
DFB = DataFrameBuild(ctx.spark)
print("%16s %s" % ("Python Version:",sys.version))
print("%16s %s" % ("Python Path:",os.path.dirname(sys.executable)))
print("%16s %s" % ("My Python Libs:",my_library))
print("%16s %s" % ("My Spark Dir:",my_spark))
print("%16s %s" % ("My Spark Ctx:",ctx.spark))
# print(ctx.spark)
# print(os.listdir(my_spark))
# print(sys.path)
# print("\n")
In [3]:
my_dir = os.path.dirname(os.path.abspath('__file__'))
print(my_dir)
Let's pretend someone gave us some car sales!
- df1 - first set
- df2 - second set
In [4]:
num = 100000
df1 = DFB.arrays_to_dataframe(
[DFB.build_array("string",num=num,width=8),
DFB.build_array("integer",num=num,nrange=(1,4)),
DFB.build_array("integer",num=num,nrange=(1,12)),
DFB.build_array("double",num=num,nrange=(0.0,10000)),
DFB.build_array("double",num=num,nrange=(0.0,25000)),
DFB.build_array("double",num=num,nrange=(500,2000)),
DFB.build_array("double",num=num,nrange=(1200,2700)),
DFB.build_array("double",num=num,nrange=(0.0,7500))],
['passwords','quarter','month','tenthousand','msrp','discount','tax','upgrades'])
df2 = DFB.arrays_to_dataframe(
[DFB.build_array("string",num=num,width=8),
DFB.build_array("integer",num=num,nrange=(1,4)),
DFB.build_array("integer",num=num,nrange=(1,12)),
DFB.build_array("double",num=num,nrange=(0.0,10000)),
DFB.build_array("double",num=num,nrange=(0.0,25000)),
DFB.build_array("double",num=num,nrange=(500,2000)),
DFB.build_array("double",num=num,nrange=(1200,2700)),
DFB.build_array("double",num=num,nrange=(0.0,7500))],
['passwords','quarter','month','tenthousand','msrp','discount','tax','upgrades'])
In [5]:
df1.printSchema()
In [6]:
df1.limit(4).toPandas()
Out[6]:
Examples:¶
- Filter certain values from a column: some in, some out.
- Do multi-column aggregation, orderBy (ascending, descending)
- Use some user defined functions (UDF) to do column manipulations.
In [20]:
months_not_jan_dec = [x for x in range(2,12)]
months_jan_dec = [1,12]
print(months_not_jan_dec)
Let's see what happens if I cut out "January" and December from my df1 and df2.
I'm going to use "isin", "filter", and the tilda to exclude these results.
These will not be an exact match on the record count, but statistically, they should be close!
In [21]:
print(df1.count())
print(df2.count())
Filter 2 of 12 months out/in¶
In [24]:
df1_febnov = df1.filter(~col("month").isin(months_jan_dec))
In [25]:
df2_febnov = df2.filter(~col("month").isin(months_jan_dec))
In [31]:
df1_jan_dec = df1.filter(col("month").isin(months_jan_dec))
df2_jan_dec = df2.filter(col("month").isin(months_jan_dec))
In [32]:
print(df1_febnov.count())
print(df2_febnov.count())
print(df1_jan_dec.count())
print(df2_jan_dec.count())
Statistics work!
In [33]:
# my exclusion of 2 out of 12 months came to 83552 the first time I ran it.
print(83552/100000)
print(10/12)
print(2/12)
Let's check that month 1 and 12 are no longer in the dataframes by using:¶
- list <- set(unique) <- row
- get the min, max
- countDistinct
In [34]:
# Let's convert a single column to a row.
months_still_avail = list(set([x.month for x in df1_febnov.collect()]))
In [35]:
print(months_still_avail)
# They're all still there!
# [2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
In [36]:
df1_febnov.select("month").agg(max("month")).show()
df1_febnov.select("month").agg(min("month")).show()
In [38]:
df1_febnov.select("month").distinct().orderBy("month").show()
Do multi-column aggregation with sum.¶
- Let's check the sums of the msrp, discount, tax, and upgrades by month and compare it to the other dataset
In [39]:
# price based columns
price_cols = ['msrp','discount','tax','upgrades']
price_sum_expr = [sum(x) for x in price_cols]
In [40]:
df1_febnov.groupBy("month")\
.agg(*price_sum_expr)\
.orderBy("sum(msrp)",ascending=False)\
.limit(50).toPandas()
Out[40]:
In [41]:
# ascending:
df1_febnov.groupBy("month")\
.agg(*price_sum_expr)\
.orderBy("month")\
.limit(50).toPandas()
Out[41]:
In [44]:
# descending:
df1_febnov.groupBy("month")\
.agg(*price_sum_expr)\
.orderBy("month",ascending=False)\
.limit(50).toPandas()
Out[44]:
In [45]:
# descending:
df2_febnov.groupBy("month")\
.agg(*price_sum_expr)\
.orderBy("month",ascending=False)\
.limit(50).toPandas()
Out[45]:
Let's find the max msrp in the summer time .. months 6,7, or 8.¶
In [62]:
# NB! the filter retains/catches those results!
df1_678 = df1\
.filter((col("month") == 6) | (col("month") == 7) | (col("month") == 8))
In [63]:
df1_678.count()
Out[63]:
In [67]:
df1_678.orderBy(col("msrp"),ascending=False).limit(10).toPandas()
Out[67]:
How can we more simply get the 3 highest records, for 3 different months? (6,7,8)¶
In [101]:
df1\
.filter(col("month") == 6)\
.orderBy(col("msrp"),ascending=False).limit(3).toPandas()
Out[101]:
In [102]:
df_6 = df1\
.filter(col("month") == 6)\
.orderBy(col("msrp"),ascending=False)
df_7 = df1\
.filter(col("month") == 7)\
.orderBy(col("msrp"),ascending=False)
df_8 = df1\
.filter(col("month") == 8)\
.orderBy(col("msrp"),ascending=False)
In [103]:
df_6.show(1)
df_7.show(1)
df_8.show(1)
In [106]:
df_6.describe().limit(10).toPandas()
Out[106]:
In [104]:
# df1_678\
# .describe("month","msrp").show()
# .orderBy(col("month"),max(col("msrp")),ascending=False)\
# .limit(10).toPandas()
# .select("month","msrp")\
#.groupBy(col("month"))\
In [ ]:
In [ ]: