<!DOCTYPE html>
In [3]:
# My Standard Spark Session!
# Python libraries:
import os
import sys
import re
from dateutil import parser
# import datetime
from datetime import datetime
from datetime import date
import builtins
import json
import functools
import operator
from itertools import product
# Numpy & Pandas!
import numpy as np
import pandas as pd
pd.options.display.float_format = '{:20,.2f}'.format
pd.options.display.max_columns = None
# Spark!
from pyspark import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *
from pyspark.sql import SparkSession, Row
spark = SparkSession.builder.appName("myapp").getOrCreate()
# spark = SparkSession.builder.master("yarn")\
# .config("spark.executor.instances", "32")\
# .config("spark.executor.cores", "4")\
# .config("spark.executor.memory", "4G")\
# .config("spark.driver.memory", "4G")\
# .config("spark.executor.memoryOverhead","4G")\
# .config("spark.yarn.queue","Medium")\
# .appName("myapp")\
# .getOrCreate()
sc = spark.sparkContext
spark.conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
spark.conf.set("spark.debug.maxToStringFields","true")
In [4]:
%load_ext autoreload
%autoreload 2
# The autoreload extension is already loaded. To reload it, use:
# %reload_ext autoreload
# mylib:
my_library = os.path.expanduser('~/.myconfigs')
my_spark = os.path.expanduser('~/spark2_dfanalysis')
sys.path.append(my_library)
sys.path.append(my_spark)
from shared.app_context import *
from builder.DataFrameBuild import *
ctx = ApplicationContext("Dev-Job")
DFB = DataFrameBuild(ctx.spark)
print("%16s %s" % ("Python Version:",sys.version))
print("%16s %s" % ("Python Path:",os.path.dirname(sys.executable)))
print("%16s %s" % ("My Python Libs:",my_library))
print("%16s %s" % ("My Spark Dir:",my_spark))
print("%16s %s" % ("My Spark Ctx:",ctx.spark))
# print(ctx.spark)
# print(os.listdir(my_spark))
# print(sys.path)
# print("\n")
Example Dataframes:¶
In [16]:
num = 10000
In [17]:
# 2 columns: integer, string
lst_cars = [random.choice(['Honda','Toyota','Chevy','Ford','Tesla','Volkswagon','Hyundai','Jeep']) for x in range(num)]
df2 = DFB.arrays_to_dataframe(
[[int(x) for x in np.linspace(1,num,num)],
lst_cars],
['index','make'])
df2.limit(4).toPandas()
Out[17]:
In [33]:
# 5 columns: strings, ints, float
import random
lst_months = ['January','Feburary','March','April','May','June',
'July','August','September','October','November','December']
years = [random.randint(1900,2019) for x in range(num)]
# print(years)
ages = [(2019 - x) for x in years]
# print(ages)
months = [random.choice(lst_months) for x in range(num)]
# print(months)
worth = [random.random() * 100000 for x in range(num)]
# print(worth)
lst_names = ['sarah','bill','steve','mary','alyssa','brian','elizabeth','josh','ryan','katie','connor','erica','lisa','doug']
names = [random.choice(lst_names) for x in range(num)]
df = DFB.arrays_to_dataframe([names,months,years,ages,worth],
['name','month','year','age','net_worth'])
df.limit(4).toPandas()
Out[33]:
In [18]:
# 10 columns: index(int), strings, ints, double
df1 = DFB.arrays_to_dataframe(
[[int(x) for x in np.linspace(1,num,num)],
DFB.build_array("string",num=num,width=3),
DFB.build_array("integer",num=num,nrange=(100,999)),
DFB.build_array("integer",num=num,nrange=(1,12)),
DFB.build_array("integer",num=num,nrange=(1,28)),
DFB.build_array("integer",num=num,nrange=(2010,2019)),
DFB.build_array("integer",num=num,nrange=(0,23)),
DFB.build_array("integer",num=num,nrange=(0,59)),
DFB.build_array("integer",num=num,nrange=(0,59)),
DFB.build_array("double",num=num,nrange=(1000,100000))],
['index','vin','division','month','day','year','hour','minute','second','price'])
df1.limit(4).toPandas()
Out[18]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]: