spark2_dfanalysis

Dataframe analysis in PySpark

View on GitHub

(home) <!–

DataFrame Builder

Let’s construct some sample DataFrames.

The DataFrames available so far use the arrays_to_dataframe function. This function converts a list of arrays and a list of names (columns) into a dataframe with column headers having the names in the list.

Basic_Dataframes1
In [1]:
# My Standard Spark Session!

# Python libraries:
import os
import sys
import re
from dateutil import parser
# import datetime
from datetime import datetime
from datetime import date
import builtins
import json
import functools
import operator
from itertools import product

# Numpy & Pandas!
import numpy as np
import pandas as pd
pd.options.display.float_format = '{:20,.2f}'.format
pd.options.display.max_columns = None

# Spark!
from pyspark import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *
from pyspark.sql import SparkSession, Row


spark = SparkSession.builder.appName("myapp").getOrCreate()

#     spark = SparkSession.builder.master("yarn")\
#     .config("spark.executor.instances", "32")\
#     .config("spark.executor.cores", "4")\
#     .config("spark.executor.memory", "4G")\
#     .config("spark.driver.memory", "4G")\
#     .config("spark.executor.memoryOverhead","4G")\
#     .config("spark.yarn.queue","Medium")\
#     .appName("myapp")\
#     .getOrCreate()

sc = spark.sparkContext
spark.conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
spark.conf.set("spark.debug.maxToStringFields","true")

DataFrameBuild Class:

DFB = DataFrameBuild(ctx.spark)

In [2]:
# mylib:
my_library = os.path.expanduser('~/.myconfigs')
my_spark = os.path.expanduser('~/spark2_dfanalysis')
sys.path.append(my_library)
sys.path.append(my_spark)

from shared.app_context import *

ctx = ApplicationContext("Dev-Job")
print(ctx.spark)

from builder.DataFrameBuild import *

DFB = DataFrameBuild(ctx.spark)
<pyspark.sql.session.SparkSession object at 0x00000222E91D86A0>

Construct some arrays. Put them into a dataframe.

In [3]:
mystr = DFB.build_array("string",num=12,width=8)
myint = DFB.build_array("integer",num=12,nrange=(0,4))    # inclusive on range
mydoub = DFB.build_array("double",num=12,nrange=(10,10.1))

print(len(mystr),mystr)
print(len(myint),myint)
print(len(mydoub),mydoub)
12 ['cfoibebi', 'chfdvbyb', 'znilxbnh', 'rqmmayra', 'bwtittnc', 'bnsdpkny', 'kpbmgcrb', 'ielgmkxe', 'lxvpklpk', 'ajdyjcfs', 'bbjdsopd', 'zjxsrhey']
12 [1 1 0 3 1 1 0 3 2 0 2 2]
12 [10.00817341 10.06123776 10.05255773 10.04497975 10.05859685 10.07159241
 10.09669543 10.00859478 10.04120524 10.03277997 10.08164395 10.03410269]
In [4]:
df1 = DFB.arrays_to_dataframe([mystr,myint,mydoub,],['strings','integers','doubles'])
In [5]:
df1.show(5)
+--------+--------+------------------+
| strings|integers|           doubles|
+--------+--------+------------------+
|cfoibebi|       1|10.008173408053299|
|chfdvbyb|       1|10.061237755949197|
|znilxbnh|       0|10.052557727833053|
|rqmmayra|       3|10.044979748677436|
|bwtittnc|       1|10.058596847168099|
+--------+--------+------------------+
only showing top 5 rows

Do it again. Put arrays or lists into a dataframe.

In [6]:
num = 500
df4 = DFB.arrays_to_dataframe(
    [DFB.build_array("string",num=num,width=8),
     DFB.build_array("integer",num=num,nrange=(1,4)),
     DFB.build_array("integer",num=num,nrange=(1,12)),
     DFB.build_array("double",num=num,nrange=(0.0,10000))],
    ['passwords','quarter','month','price'])
In [7]:
df4.limit(10).toPandas()
Out[7]:
passwords quarter month price
0 jicyulnb 1 12 8,559.12
1 bkmmtofn 1 7 1,392.53
2 gxiqnuxl 2 9 5,729.57
3 mpxvisme 1 10 6,368.27
4 vgpafomt 2 3 2,779.55
5 znzylnzt 1 7 1,837.04
6 zjvigovw 1 6 9,486.31
7 faokwsbz 2 6 5,604.54
8 johcdjco 3 8 1,887.84
9 sjiioevs 3 10 8,141.57
In [ ]:

In [ ]: