spark2_dfanalysis

Dataframe analysis in PySpark

View on GitHub

<!DOCTYPE html>

Example_Dataframes
In [3]:
# My Standard Spark Session!

# Python libraries:
import os
import sys
import re
from dateutil import parser
# import datetime
from datetime import datetime
from datetime import date
import builtins
import json
import functools
import operator
from itertools import product

# Numpy & Pandas!
import numpy as np
import pandas as pd
pd.options.display.float_format = '{:20,.2f}'.format
pd.options.display.max_columns = None

# Spark!
from pyspark import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *
from pyspark.sql import SparkSession, Row


spark = SparkSession.builder.appName("myapp").getOrCreate()

#     spark = SparkSession.builder.master("yarn")\
#     .config("spark.executor.instances", "32")\
#     .config("spark.executor.cores", "4")\
#     .config("spark.executor.memory", "4G")\
#     .config("spark.driver.memory", "4G")\
#     .config("spark.executor.memoryOverhead","4G")\
#     .config("spark.yarn.queue","Medium")\
#     .appName("myapp")\
#     .getOrCreate()

sc = spark.sparkContext
spark.conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
spark.conf.set("spark.debug.maxToStringFields","true")
In [4]:
%load_ext autoreload
%autoreload 2
# The autoreload extension is already loaded. To reload it, use:
#  %reload_ext autoreload


# mylib:
my_library = os.path.expanduser('~/.myconfigs')
my_spark = os.path.expanduser('~/spark2_dfanalysis')
sys.path.append(my_library)
sys.path.append(my_spark)


from shared.app_context import *
from builder.DataFrameBuild import *

ctx = ApplicationContext("Dev-Job")

DFB = DataFrameBuild(ctx.spark)

print("%16s  %s" % ("Python Version:",sys.version))
print("%16s  %s" % ("Python Path:",os.path.dirname(sys.executable)))
print("%16s  %s" % ("My Python Libs:",my_library))
print("%16s  %s" % ("My Spark Dir:",my_spark))
print("%16s  %s" % ("My Spark Ctx:",ctx.spark))
# print(ctx.spark)
# print(os.listdir(my_spark))
# print(sys.path)
# print("\n")
 Python Version:  3.6.1 |Anaconda 4.4.0 (64-bit)| (default, May 11 2017, 13:25:24) [MSC v.1900 64 bit (AMD64)]
    Python Path:  C:\Users\d810216\AppData\Local\conda\conda\envs\my_root
 My Python Libs:  C:\Users\d810216/.myconfigs
   My Spark Dir:  C:\Users\d810216/spark2_dfanalysis
   My Spark Ctx:  <pyspark.sql.session.SparkSession object at 0x000002521C8FC4E0>

Example Dataframes:

In [16]:
num = 10000
In [17]:
# 2 columns: integer, string

lst_cars = [random.choice(['Honda','Toyota','Chevy','Ford','Tesla','Volkswagon','Hyundai','Jeep']) for x in range(num)]

df2 = DFB.arrays_to_dataframe(
    [[int(x) for x in np.linspace(1,num,num)],
    lst_cars],
    ['index','make'])

df2.limit(4).toPandas()
Out[17]:
index make
0 1 Chevy
1 2 Tesla
2 3 Tesla
3 4 Ford
In [33]:
# 5 columns: strings, ints, float
import random

lst_months = ['January','Feburary','March','April','May','June',
              'July','August','September','October','November','December']

years = [random.randint(1900,2019) for x in range(num)]
# print(years)

ages = [(2019 - x) for x in years]
# print(ages)

months = [random.choice(lst_months) for x in range(num)]
# print(months)

worth = [random.random() * 100000 for x in range(num)]
# print(worth)

lst_names = ['sarah','bill','steve','mary','alyssa','brian','elizabeth','josh','ryan','katie','connor','erica','lisa','doug']
names = [random.choice(lst_names) for x in range(num)]


df = DFB.arrays_to_dataframe([names,months,years,ages,worth],
                             ['name','month','year','age','net_worth'])


df.limit(4).toPandas()
Out[33]:
name month year age net_worth
0 elizabeth Feburary 1928 91 2,969.73
1 connor October 1968 51 80,265.32
2 bill July 1917 102 64,271.73
3 alyssa September 2010 9 17,296.99
In [18]:
# 10 columns: index(int), strings, ints, double

df1 = DFB.arrays_to_dataframe(
    [[int(x) for x in np.linspace(1,num,num)],
     DFB.build_array("string",num=num,width=3),
     DFB.build_array("integer",num=num,nrange=(100,999)),
     DFB.build_array("integer",num=num,nrange=(1,12)),
     DFB.build_array("integer",num=num,nrange=(1,28)),
     DFB.build_array("integer",num=num,nrange=(2010,2019)),
     DFB.build_array("integer",num=num,nrange=(0,23)),
     DFB.build_array("integer",num=num,nrange=(0,59)),
     DFB.build_array("integer",num=num,nrange=(0,59)),
     DFB.build_array("double",num=num,nrange=(1000,100000))],
    ['index','vin','division','month','day','year','hour','minute','second','price'])

df1.limit(4).toPandas()
Out[18]:
index vin division month day year hour minute second price
0 1 nki 787 11 1 2016 16 47 35 97,041.59
1 2 kfd 105 4 13 2014 6 14 34 17,033.22
2 3 jty 607 7 21 2013 17 7 34 57,138.44
3 4 aqt 184 10 24 2011 1 15 21 68,126.04
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]: