<!DOCTYPE html>
join_2dataframes¶
In [1]:
%load_ext autoreload
%autoreload 2
# The autoreload extension is already loaded. To reload it, use:
# %reload_ext autoreload
In [2]:
import os
import sys
import numpy as np
import pandas as pd
pd.options.display.float_format = '{:8,.2f}'.format
In [3]:
# mylib:
my_library = os.path.expanduser('~/.myconfigs')
my_spark = os.path.expanduser('~/spark2_dfanalysis')
sys.path.append(my_library)
sys.path.append(my_spark)
In [4]:
import pyspark as spark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
# spark = SparkSession.builder.appName('myappname').getOrCreate()
# print(spark)
In [5]:
# print(dir(pyspark))
# print(dir(pyspark.sql))
# print(dir(pyspark.rdd))
In [6]:
from shared.app_context import *
ctx = ApplicationContext("Dev-Job")
print(ctx.spark)
In [7]:
# print(sys.path)
from builder.DataFrameBuild import *
In [8]:
x = DataFrameBuild(ctx.spark)
DF1
In [22]:
x = DataFrameBuild(ctx.spark)
num = 500
df1 = x.arrays_to_dataframe([[int(x) for x in np.linspace(1,num,num)],
x.build_array("string",num=num,width=8),
x.build_array("integer",num=num,nrange=(1,4)),
x.build_array("integer",num=num,nrange=(1,12)),
x.build_array("double",num=num,nrange=(0.0,10000))],
['index','passwords','quarter','month','price'])
DF2
In [23]:
x = DataFrameBuild(ctx.spark)
num = 500
df2 = x.arrays_to_dataframe([[int(x) for x in np.linspace(1,num,num)],
x.build_array("string",num=num,width=8),
x.build_array("integer",num=num,nrange=(1,4)),
x.build_array("integer",num=num,nrange=(1,12)),
x.build_array("double",num=num,nrange=(0.0,10000))],
['index','passwords','quarter','month','price'])
In [24]:
df1.limit(10).show()
In [25]:
df2.limit(10).show()
Join df1 and df2 on indices.¶
In [60]:
df_index = df1.join(df2,df1.index == df2.index)
df_index = df1.join(df2,df1["index"] == df2["index"])
In [61]:
%%time
df_index.cache()
Out[61]:
In [62]:
df_index.show(20)
In [58]:
df_index.columns
Out[58]:
Join df1 and df2 on indices. (round 2)¶
In [71]:
df_i2 = df1.alias("o")\
.join(df2.alias("t"),col("o.index") == col("t.index"))
# df_index = df1.join(df2,df1["index"] == df2["index"])
In [72]:
%%time
df_i2.cache()
Out[72]:
In [73]:
df_i2.show(20)
In [74]:
df_i2.columns
Out[74]:
Create a new column calculated from current column values.¶
In [ ]: