spark2_dfanalysis

Dataframe analysis in PySpark

View on GitHub

<!DOCTYPE html>

Categorize
In [2]:
# My Standard Spark Session!

# Python libraries:
import os
import sys
import re
from dateutil import parser
# import datetime
from datetime import datetime
from datetime import date
import builtins
import json
import functools
import operator
from itertools import product

# Numpy & Pandas!
import numpy as np
import pandas as pd
pd.options.display.float_format = '{:20,.2f}'.format
pd.options.display.max_columns = None

# Spark!
from pyspark import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *
from pyspark.sql import SparkSession, Row


spark = SparkSession.builder.appName("myapp").getOrCreate()

#     spark = SparkSession.builder.master("yarn")\
#     .config("spark.executor.instances", "32")\
#     .config("spark.executor.cores", "4")\
#     .config("spark.executor.memory", "4G")\
#     .config("spark.driver.memory", "4G")\
#     .config("spark.executor.memoryOverhead","4G")\
#     .config("spark.yarn.queue","Medium")\
#     .appName("myapp")\
#     .getOrCreate()

sc = spark.sparkContext
spark.conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
spark.conf.set("spark.debug.maxToStringFields","true")
In [3]:
%load_ext autoreload
%autoreload 2
# The autoreload extension is already loaded. To reload it, use:
#  %reload_ext autoreload


# mylib:
my_library = os.path.expanduser('~/.myconfigs')
my_spark = os.path.expanduser('~/spark2_dfanalysis')
sys.path.append(my_library)
sys.path.append(my_spark)


from shared.app_context import *
from builder.DataFrameBuild import *

ctx = ApplicationContext("Dev-Job")

DFB = DataFrameBuild(ctx.spark)

print("%16s  %s" % ("Python Version:",sys.version))
print("%16s  %s" % ("Python Path:",os.path.dirname(sys.executable)))
print("%16s  %s" % ("My Python Libs:",my_library))
print("%16s  %s" % ("My Spark Dir:",my_spark))
print("%16s  %s" % ("My Spark Ctx:",ctx.spark))
# print(ctx.spark)
# print(os.listdir(my_spark))
# print(sys.path)
# print("\n")
 Python Version:  3.6.1 |Anaconda 4.4.0 (64-bit)| (default, May 11 2017, 13:25:24) [MSC v.1900 64 bit (AMD64)]
    Python Path:  C:\Users\d810216\AppData\Local\conda\conda\envs\my_root
 My Python Libs:  C:\Users\d810216/.myconfigs
   My Spark Dir:  C:\Users\d810216/spark2_dfanalysis
   My Spark Ctx:  <pyspark.sql.session.SparkSession object at 0x0000014D750C4358>
In [25]:
# 6 columns: strings, ints, float
import random

num = 100000

lst_months = ['January','Feburary','March','April','May','June',
              'July','August','September','October','November','December']

indices = [int(x) for x in np.linspace(0,num-1,num)]
# print(indices)

years = [random.randint(1900,2019) for x in range(num)]
# print(years)

ages = [(2019 - x) for x in years]
# print(ages)

months = [random.choice(lst_months) for x in range(num)]
# print(months)

worth = [random.random() * 100000 for x in range(num)]
# print(worth)

lst_names = ['sarah','bill','steve','mary','alyssa','brian','elizabeth','josh','ryan','katie','connor',
             'erica','lisa','doug','stacie','chris','gary','tom','ellen','paula']
names = [random.choice(lst_names) for x in range(num)]


df = DFB.arrays_to_dataframe([indices,names,months,years,ages,worth],
                             ['index','name','month','year','age','net_worth'])


df.limit(4).toPandas()
Out[25]:
index name month year age net_worth
0 0 mary September 1902 117 61,878.66
1 1 doug Feburary 1915 104 6,831.12
2 2 gary July 1992 27 58,481.80
3 3 mary June 1941 78 33,809.77

Working Example:

Let's categorize each sale as low, medium, or high

  • low: <= 20,000
  • medium: 20,000 < x < 34,000
  • high: 34,000 <= 100,000
In [26]:
def get_price_range(x):
    if x <= 20000:
        return 'low'
    elif x < 34000:
        return 'medium'
    else:
        return 'high'

# UDF:
udf_price_range = udf(lambda z: get_price_range(z),StringType())
In [27]:
df_lmh = df\
.withColumn("price_range",udf_price_range(col("net_worth")))
In [28]:
df_lmh.limit(10).toPandas()
Out[28]:
index name month year age net_worth price_range
0 0 mary September 1902 117 61,878.66 high
1 1 doug Feburary 1915 104 6,831.12 low
2 2 gary July 1992 27 58,481.80 high
3 3 mary June 1941 78 33,809.77 medium
4 4 josh April 1970 49 20,969.97 medium
5 5 stacie December 1992 27 17,460.01 low
6 6 sarah March 2002 17 30,615.19 medium
7 7 gary June 1911 108 65,697.47 high
8 8 alyssa August 1949 70 15,886.09 low
9 9 josh Feburary 1958 61 94,902.19 high

Now tally the results

In [29]:
df_lmh\
.groupBy("price_range")\
.count()\
.orderBy("count",ascending=False)\
.limit(5)\
.toPandas()
Out[29]:
price_range count
0 high 66173
1 low 19901
2 medium 13926

Filtered by a single category, get the results

In [30]:
df_lmh\
.filter(col("price_range") == 'high')\
.agg(count("price_range"),min("net_worth"),max("net_worth"),mean("net_worth"),sum("net_worth"))\
.limit(5)\
.toPandas()
Out[30]:
count(price_range) min(net_worth) max(net_worth) avg(net_worth) sum(net_worth)
0 66173 34,000.33 99,998.52 67,032.90 4,435,768,036.16
In [32]:
df_lmh\
.filter(col("price_range") == 'medium')\
.agg(count("price_range"),min("net_worth"),max("net_worth"),mean("net_worth"),sum("net_worth"))\
.limit(5)\
.toPandas()
Out[32]:
count(price_range) min(net_worth) max(net_worth) avg(net_worth) sum(net_worth)
0 13926 20,000.02 33,996.10 27,056.71 376,791,781.13
In [33]:
df_lmh\
.filter(col("price_range") == 'low')\
.agg(count("price_range"),min("net_worth"),max("net_worth"),mean("net_worth"),sum("net_worth"))\
.limit(5)\
.toPandas()
Out[33]:
count(price_range) min(net_worth) max(net_worth) avg(net_worth) sum(net_worth)
0 19901 2.30 19,998.48 10,052.31 200,051,119.47

Tally the results, and get the results by all categories simultaneously

In [31]:
df_lmh\
.groupBy("price_range")\
.agg(count("price_range"),min("net_worth"),max("net_worth"),mean("net_worth"),sum("net_worth"))\
.limit(5)\
.toPandas()
Out[31]:
price_range count(price_range) min(net_worth) max(net_worth) avg(net_worth) sum(net_worth)
0 low 19901 2.30 19,998.48 10,052.31 200,051,119.47
1 high 66173 34,000.33 99,998.52 67,032.90 4,435,768,036.16
2 medium 13926 20,000.02 33,996.10 27,056.71 376,791,781.13
In [ ]: