spark2_dfanalysis

Dataframe analysis in PySpark

View on GitHub

<!DOCTYPE html>

Analysis_basic_group_orderBy_aggregate_describe
In [1]:
# My Standard Spark Session!

# Python libraries:
import os
import sys
import re
from dateutil import parser
# import datetime
from datetime import datetime
from datetime import date
import builtins
import json
import functools
import operator
from itertools import product

# Numpy & Pandas!
import numpy as np
import pandas as pd
pd.options.display.float_format = '{:20,.2f}'.format
pd.options.display.max_columns = None

# Spark!
from pyspark import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *
from pyspark.sql import SparkSession, Row


spark = SparkSession.builder.appName("myapp").getOrCreate()

#     spark = SparkSession.builder.master("yarn")\
#     .config("spark.executor.instances", "32")\
#     .config("spark.executor.cores", "4")\
#     .config("spark.executor.memory", "4G")\
#     .config("spark.driver.memory", "4G")\
#     .config("spark.executor.memoryOverhead","4G")\
#     .config("spark.yarn.queue","Medium")\
#     .appName("myapp")\
#     .getOrCreate()

sc = spark.sparkContext
spark.conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
spark.conf.set("spark.debug.maxToStringFields","true")
In [2]:
%load_ext autoreload
%autoreload 2
# The autoreload extension is already loaded. To reload it, use:
#  %reload_ext autoreload


# mylib:
my_library = os.path.expanduser('~/.myconfigs')
my_spark = os.path.expanduser('~/spark2_dfanalysis')
sys.path.append(my_library)
sys.path.append(my_spark)


from shared.app_context import *
from builder.DataFrameBuild import *

ctx = ApplicationContext("Dev-Job")

DFB = DataFrameBuild(ctx.spark)

print("%16s  %s" % ("Python Version:",sys.version))
print("%16s  %s" % ("Python Path:",os.path.dirname(sys.executable)))
print("%16s  %s" % ("My Python Libs:",my_library))
print("%16s  %s" % ("My Spark Dir:",my_spark))
print("%16s  %s" % ("My Spark Ctx:",ctx.spark))
# print(ctx.spark)
# print(os.listdir(my_spark))
# print(sys.path)
# print("\n")
 Python Version:  3.6.1 |Anaconda 4.4.0 (64-bit)| (default, May 11 2017, 13:25:24) [MSC v.1900 64 bit (AMD64)]
    Python Path:  C:\Users\d810216\AppData\Local\conda\conda\envs\my_root
 My Python Libs:  C:\Users\d810216/.myconfigs
   My Spark Dir:  C:\Users\d810216/spark2_dfanalysis
   My Spark Ctx:  <pyspark.sql.session.SparkSession object at 0x0000016736A53390>
In [3]:
my_dir = os.path.dirname(os.path.abspath('__file__'))
print(my_dir)
C:\Users\d810216\spark2_dfanalysis\notebooks\dev

Let's pretend someone gave us some car sales!

  • df1 - first set
  • df2 - second set
In [4]:
num = 100000


df1 = DFB.arrays_to_dataframe(
    [DFB.build_array("string",num=num,width=8),
     DFB.build_array("integer",num=num,nrange=(1,4)),
     DFB.build_array("integer",num=num,nrange=(1,12)),
     DFB.build_array("double",num=num,nrange=(0.0,10000)),
     DFB.build_array("double",num=num,nrange=(0.0,25000)),
     DFB.build_array("double",num=num,nrange=(500,2000)),
     DFB.build_array("double",num=num,nrange=(1200,2700)),
     DFB.build_array("double",num=num,nrange=(0.0,7500))],
    ['passwords','quarter','month','tenthousand','msrp','discount','tax','upgrades'])


df2 = DFB.arrays_to_dataframe(
    [DFB.build_array("string",num=num,width=8),
     DFB.build_array("integer",num=num,nrange=(1,4)),
     DFB.build_array("integer",num=num,nrange=(1,12)),
     DFB.build_array("double",num=num,nrange=(0.0,10000)),
     DFB.build_array("double",num=num,nrange=(0.0,25000)),
     DFB.build_array("double",num=num,nrange=(500,2000)),
     DFB.build_array("double",num=num,nrange=(1200,2700)),
     DFB.build_array("double",num=num,nrange=(0.0,7500))],
    ['passwords','quarter','month','tenthousand','msrp','discount','tax','upgrades'])
In [5]:
df1.printSchema()
root
 |-- passwords: string (nullable = true)
 |-- quarter: long (nullable = true)
 |-- month: long (nullable = true)
 |-- tenthousand: double (nullable = true)
 |-- msrp: double (nullable = true)
 |-- discount: double (nullable = true)
 |-- tax: double (nullable = true)
 |-- upgrades: double (nullable = true)

In [6]:
df1.limit(4).toPandas()
Out[6]:
passwords quarter month tenthousand msrp discount tax upgrades
0 snawehoh 1 5 9,669.54 21,075.52 616.79 2,003.87 7,063.73
1 qusxutsf 1 3 3,997.48 11,419.71 1,493.40 1,859.85 2,092.91
2 hrngoyat 1 6 2,105.70 8,012.16 1,512.80 2,195.59 1,231.33
3 yswxtokx 3 7 9,924.58 20,964.81 991.76 2,512.14 6,622.61

Examples:

  • Filter certain values from a column: some in, some out.
  • Do multi-column aggregation, orderBy (ascending, descending)
  • Use some user defined functions (UDF) to do column manipulations.
In [20]:
months_not_jan_dec = [x for x in range(2,12)]
months_jan_dec = [1,12]
print(months_not_jan_dec)
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

Let's see what happens if I cut out "January" and December from my df1 and df2.
I'm going to use "isin", "filter", and the tilda to exclude these results.
These will not be an exact match on the record count, but statistically, they should be close!

In [21]:
print(df1.count())
print(df2.count())
100000
100000

Filter 2 of 12 months out/in

In [24]:
df1_febnov = df1.filter(~col("month").isin(months_jan_dec))
In [25]:
df2_febnov = df2.filter(~col("month").isin(months_jan_dec))
In [31]:
df1_jan_dec = df1.filter(col("month").isin(months_jan_dec))
df2_jan_dec = df2.filter(col("month").isin(months_jan_dec))
In [32]:
print(df1_febnov.count())
print(df2_febnov.count())

print(df1_jan_dec.count())
print(df2_jan_dec.count())
83234
83424
16766
16576

Statistics work!

In [33]:
# my exclusion of 2 out of 12 months came to 83552 the first time I ran it.
print(83552/100000)
print(10/12)

print(2/12)
0.83552
0.8333333333333334
0.16666666666666666

Let's check that month 1 and 12 are no longer in the dataframes by using:

  • list <- set(unique) <- row
  • get the min, max
  • countDistinct
In [34]:
# Let's convert a single column to a row.
months_still_avail = list(set([x.month for x in df1_febnov.collect()]))
In [35]:
print(months_still_avail)

# They're all still there!
# [2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
In [36]:
df1_febnov.select("month").agg(max("month")).show()
df1_febnov.select("month").agg(min("month")).show()
+----------+
|max(month)|
+----------+
|        11|
+----------+

+----------+
|min(month)|
+----------+
|         2|
+----------+

In [38]:
df1_febnov.select("month").distinct().orderBy("month").show()
+-----+
|month|
+-----+
|    2|
|    3|
|    4|
|    5|
|    6|
|    7|
|    8|
|    9|
|   10|
|   11|
+-----+

Do multi-column aggregation with sum.

  • Let's check the sums of the msrp, discount, tax, and upgrades by month and compare it to the other dataset
In [39]:
# price based columns

price_cols = ['msrp','discount','tax','upgrades']   

price_sum_expr = [sum(x) for x in price_cols]
In [40]:
df1_febnov.groupBy("month")\
.agg(*price_sum_expr)\
.orderBy("sum(msrp)",ascending=False)\
.limit(50).toPandas()
Out[40]:
month sum(msrp) sum(discount) sum(tax) sum(upgrades)
0 11 105,855,753.40 10,558,502.31 16,512,540.38 31,766,978.51
1 8 105,697,285.00 10,526,173.63 16,468,807.62 31,390,778.07
2 3 105,239,178.20 10,529,493.56 16,539,723.12 31,558,022.14
3 2 105,194,017.01 10,469,590.24 16,443,190.99 31,354,700.09
4 5 104,998,096.65 10,361,421.23 16,327,075.48 31,409,763.98
5 6 103,534,556.47 10,376,541.18 16,198,138.56 31,359,100.60
6 4 103,040,760.47 10,331,847.70 16,158,364.92 31,151,526.39
7 7 102,560,396.91 10,357,990.08 16,104,010.11 31,081,796.87
8 10 102,540,099.71 10,242,788.20 15,953,669.97 30,850,906.23
9 9 99,945,124.35 9,960,439.89 15,639,564.24 29,860,856.27
In [41]:
# ascending:
df1_febnov.groupBy("month")\
.agg(*price_sum_expr)\
.orderBy("month")\
.limit(50).toPandas()
Out[41]:
month sum(msrp) sum(discount) sum(tax) sum(upgrades)
0 2 105,194,017.01 10,469,590.24 16,443,190.99 31,354,700.09
1 3 105,239,178.20 10,529,493.56 16,539,723.12 31,558,022.14
2 4 103,040,760.47 10,331,847.70 16,158,364.92 31,151,526.39
3 5 104,998,096.65 10,361,421.23 16,327,075.48 31,409,763.98
4 6 103,534,556.47 10,376,541.18 16,198,138.56 31,359,100.60
5 7 102,560,396.91 10,357,990.08 16,104,010.11 31,081,796.87
6 8 105,697,285.00 10,526,173.63 16,468,807.62 31,390,778.07
7 9 99,945,124.35 9,960,439.89 15,639,564.24 29,860,856.27
8 10 102,540,099.71 10,242,788.20 15,953,669.97 30,850,906.23
9 11 105,855,753.40 10,558,502.31 16,512,540.38 31,766,978.51
In [44]:
# descending:
df1_febnov.groupBy("month")\
.agg(*price_sum_expr)\
.orderBy("month",ascending=False)\
.limit(50).toPandas()
Out[44]:
month sum(msrp) sum(discount) sum(tax) sum(upgrades)
0 11 105,855,753.40 10,558,502.31 16,512,540.38 31,766,978.51
1 10 102,540,099.71 10,242,788.20 15,953,669.97 30,850,906.23
2 9 99,945,124.35 9,960,439.89 15,639,564.24 29,860,856.27
3 8 105,697,285.00 10,526,173.63 16,468,807.62 31,390,778.07
4 7 102,560,396.91 10,357,990.08 16,104,010.11 31,081,796.87
5 6 103,534,556.47 10,376,541.18 16,198,138.56 31,359,100.60
6 5 104,998,096.65 10,361,421.23 16,327,075.48 31,409,763.98
7 4 103,040,760.47 10,331,847.70 16,158,364.92 31,151,526.39
8 3 105,239,178.20 10,529,493.56 16,539,723.12 31,558,022.14
9 2 105,194,017.01 10,469,590.24 16,443,190.99 31,354,700.09
In [45]:
# descending:
df2_febnov.groupBy("month")\
.agg(*price_sum_expr)\
.orderBy("month",ascending=False)\
.limit(50).toPandas()
Out[45]:
month sum(msrp) sum(discount) sum(tax) sum(upgrades)
0 11 104,566,032.80 10,499,755.73 16,294,888.87 31,559,189.80
1 10 104,375,392.57 10,354,372.50 16,294,745.72 31,409,742.93
2 9 102,872,863.72 10,271,764.28 16,105,473.91 30,758,758.34
3 8 107,086,689.03 10,730,774.73 16,716,835.47 32,130,763.87
4 7 104,942,508.47 10,385,217.35 16,199,618.13 31,437,732.13
5 6 103,759,385.61 10,363,793.87 16,238,418.47 31,121,530.60
6 5 103,054,766.12 10,317,237.28 16,112,904.76 30,863,318.19
7 4 105,364,638.63 10,619,500.07 16,518,245.47 31,620,804.66
8 3 103,117,723.14 10,334,129.62 16,121,377.62 30,672,249.74
9 2 104,752,423.97 10,475,493.76 16,319,597.62 31,350,142.40

Let's find the max msrp in the summer time .. months 6,7, or 8.

In [62]:
# NB! the filter retains/catches those results!
df1_678 = df1\
.filter((col("month") == 6) | (col("month") == 7) | (col("month") == 8))
In [63]:
df1_678.count()
Out[63]:
25071
In [67]:
df1_678.orderBy(col("msrp"),ascending=False).limit(10).toPandas()
Out[67]:
passwords quarter month tenthousand msrp discount tax upgrades
0 pzjjmgch 2 6 1,461.84 24,999.55 1,215.89 2,437.16 2,394.53
1 zlerldky 3 8 2,945.34 24,998.63 754.38 2,252.98 5,477.38
2 vhzkplyb 3 6 9,013.47 24,998.57 1,641.26 1,771.71 2,434.50
3 awgmripr 4 6 9,752.61 24,997.36 1,479.48 2,253.08 2,747.57
4 wxzlxxrf 4 6 5,929.58 24,996.64 1,530.11 2,378.69 6,041.52
5 pzmljyww 3 7 129.64 24,995.10 898.19 2,326.50 3,674.86
6 nzrnsvoj 4 6 3,402.12 24,994.57 1,750.99 2,506.38 2,927.07
7 exwjeyxb 3 8 8,131.44 24,993.42 1,754.94 2,258.11 1,999.24
8 axyfsxjd 2 8 57.31 24,990.24 1,282.29 1,351.13 792.16
9 jtvrrsxq 3 7 3,065.87 24,987.01 1,171.31 1,463.08 6,930.55

How can we more simply get the 3 highest records, for 3 different months? (6,7,8)

In [101]:
df1\
.filter(col("month") == 6)\
.orderBy(col("msrp"),ascending=False).limit(3).toPandas()
Out[101]:
passwords quarter month tenthousand msrp discount tax upgrades
0 pzjjmgch 2 6 1,461.84 24,999.55 1,215.89 2,437.16 2,394.53
1 vhzkplyb 3 6 9,013.47 24,998.57 1,641.26 1,771.71 2,434.50
2 awgmripr 4 6 9,752.61 24,997.36 1,479.48 2,253.08 2,747.57
In [102]:
df_6 = df1\
.filter(col("month") == 6)\
.orderBy(col("msrp"),ascending=False)

df_7 = df1\
.filter(col("month") == 7)\
.orderBy(col("msrp"),ascending=False)

df_8 = df1\
.filter(col("month") == 8)\
.orderBy(col("msrp"),ascending=False)
In [103]:
df_6.show(1)
df_7.show(1)
df_8.show(1)
+---------+-------+-----+-----------------+----------------+-----------------+------------------+-----------------+
|passwords|quarter|month|      tenthousand|            msrp|         discount|               tax|         upgrades|
+---------+-------+-----+-----------------+----------------+-----------------+------------------+-----------------+
| pzjjmgch|      2|    6|1461.839155233503|24999.5510895652|1215.886952359922|2437.1608631895583|2394.533450662693|
+---------+-------+-----+-----------------+----------------+-----------------+------------------+-----------------+
only showing top 1 row

+---------+-------+-----+------------------+------------------+----------------+------------------+------------------+
|passwords|quarter|month|       tenthousand|              msrp|        discount|               tax|          upgrades|
+---------+-------+-----+------------------+------------------+----------------+------------------+------------------+
| pzmljyww|      3|    7|129.63961701409232|24995.101445610413|898.193515120447|2326.5023914907297|3674.8558939517206|
+---------+-------+-----+------------------+------------------+----------------+------------------+------------------+
only showing top 1 row

+---------+-------+-----+------------------+------------------+-----------------+-----------------+-----------------+
|passwords|quarter|month|       tenthousand|              msrp|         discount|              tax|         upgrades|
+---------+-------+-----+------------------+------------------+-----------------+-----------------+-----------------+
| zlerldky|      3|    8|2945.3409530579634|24998.627775253066|754.3752188408489|2252.983491174753|5477.381555635178|
+---------+-------+-----+------------------+------------------+-----------------+-----------------+-----------------+
only showing top 1 row

In [106]:
df_6.describe().limit(10).toPandas()
Out[106]:
summary passwords quarter month tenthousand msrp discount tax upgrades
0 count 8348 8348 8348 8348 8348 8348 8348 8348
1 mean None 2.517010062290369 6.0 5028.778305493424 12402.318695149945 1242.997266868616 1940.3615905621198 3756.480666121937
2 stddev None 1.1215023004819005 0.0 2900.2663022450442 7190.538368411292 437.36113779212064 432.9776416816437 2156.3171914986347
3 min aadniyke 1 6 1.978207195396653 2.086988442212556 500.07552449963055 1200.0089130961217 1.171347992580396
4 max zzunmhwo 4 6 9998.62187374124 24999.5510895652 1999.892101561227 2699.967922279053 7499.780700993599
In [104]:
# df1_678\
# .describe("month","msrp").show()


# .orderBy(col("month"),max(col("msrp")),ascending=False)\
# .limit(10).toPandas()

# .select("month","msrp")\
#.groupBy(col("month"))\
In [ ]:
 
In [ ]: