# sample_df1, sample_df2, ldf, rdf
# sample dataFrame 1
Person = collections.namedtuple('Person', 'name age job')
row1 = Person(name="hayoon", age=7, job="student")
row2 = Person(name="sunwoo", age=13, job="student")
row3 = Person(name="hajoo", age=5, job="kindergartener")
row4 = Person(name="jinwoo", age=13, job="student")
data = [row1, row2, row3, row4]
sample_df = spark.createDataFrame(data)
# sample dataFrame 2
d1 = ("store2", "note", 20, 2000)
d2 = ("store2", "bag", 10, 5000)
d3 = ("store1", "note", 15, 1000)
d4 = ("store1", "pen", 20, 5000)
sample_df2 = spark.createDataFrame([d1, d2, d3, d4]).toDF("store", "product", "amount", "price")
ldf = spark.createDataFrame([Word("w1", 1), Word("w2", 1)])
rdf = spark.createDataFrame([Word("w1", 1), Word("w3", 1)])
# runMaxMin(spark, df)
def runMaxMin(spark, df):
min_col = min("age")
max_col = max("age")
df.select(min_col, max_col).show()
# runMaxMin(spark, sample_df)
# runAggregateFunctions(spark, df1, df2) - 집계 함수
def runAggregateFunctions(spark, df1, df2):
# collect_list, collect_set
doubledDf1 = df1.union(df1)
doubledDf1.select(functions.collect_list(doubledDf1["name"])).show(truncate=False)
doubledDf1.select(functions.collect_set(doubledDf1["name"])).show(truncate=False)
# count, countDistinct
doubledDf1.select(functions.count(doubledDf1["name"]), functions.countDistinct(doubledDf1["name"])).show(
truncate=False)
# sum
df2.printSchema()
df2.select(sum(df2["price"])).show(truncate=False)
# grouping, grouping_id
df2.cube(df2["store"], df2["product"]).agg(sum(df2["amount"]), grouping(df2["store"])).show(truncate=False)
df2.cube(df2["store"], df2["product"]).agg(sum(df2["amount"]), grouping_id(df2["store"], df2["product"])).show(
truncate=False)
runAggregateFunctions(spark, sample_df, sample_df2)
runAggregateFunctions 결과
# runOtherFunctions(spark, df)
def runOtherFunctions(spark, personDf):
df = spark.createDataFrame([("v1", "v2", "v3")], ["c1", "c2", "c3"]);
# array
df.select(df.c1, df.c2, df.c3, array("c1", "c2", "c3").alias("newCol")).show(truncate=False)
# desc, asc
personDf.show()
personDf.sort(functions.desc("age"), functions.asc("name")).show()
# pyspark 2.1.0 버전은 desc_nulls_first, desc_nulls_last, asc_nulls_first, asc_nulls_last 지원하지 않음
# split, length (pyspark에서 컬럼은 df["col"] 또는 df.col 형태로 사용 가능)
df2 = spark.createDataFrame([("Splits str around pattern",)], ['value'])
df2.select(df2.value, split(df2.value, " "), length(df2.value)).show(truncate=False)
# rownum, rank
f1 = StructField("date", StringType(), True)
f2 = StructField("product", StringType(), True)
f3 = StructField("amount", IntegerType(), True)
schema = StructType([f1, f2, f3])
p1 = ("2017-12-25 12:01:00", "note", 1000)
p2 = ("2017-12-25 12:01:10", "pencil", 3500)
p3 = ("2017-12-25 12:03:20", "pencil", 23000)
p4 = ("2017-12-25 12:05:00", "note", 1500)
p5 = ("2017-12-25 12:05:07", "note", 2000)
p6 = ("2017-12-25 12:06:25", "note", 1000)
p7 = ("2017-12-25 12:08:00", "pencil", 500)
p8 = ("2017-12-25 12:09:45", "note", 30000)
dd = spark.createDataFrame([p1, p2, p3, p4, p5, p6, p7, p8], schema)
w1 = Window.partitionBy("product").orderBy("amount")
w2 = Window.orderBy("amount")
dd.select(dd.product, dd.amount, functions.row_number().over(w1).alias("rownum"),
functions.rank().over(w2).alias("rank")).show()
runOtherFunctions(spark, sample_df)
# runAgg(spark, df)
def runAgg(spark, df):
df.agg(max("amount"), min("price")).show()
df.agg({"amount": "max", "price": "min"}).show()
runAgg(spark, sample_df2)
# runDfAlias(spark, df)
def runDfAlias(spark, df):
df.select(df["product"]).show()
df.alias("aa").select("aa.product").show()
runDfAlias(spark, sample_df2)
# runGroupBy(spark, df)
def runGroupBy(spark, df):
df.groupBy("store", "product").agg({"price": "sum"}).show()
# runGroupBy(spark, sample_df2)
# runCube(spark, df)
def runCube(spark, df):
df.cube("store", "product").agg({"price": "sum"}).show()
# runCube(spark, sample_df2)
# runDistinct(spark)
def runDistinct(spark):
d1 = ("store1", "note", 20, 2000)
d2 = ("store1", "bag", 10, 5000)
d3 = ("store1", "note", 20, 2000)
rows = [d1, d2, d3]
cols = ["store", "product", "amount", "price"]
df = spark.createDataFrame(rows, cols)
df.distinct().show()
df.dropDuplicates(["store"]).show()
# runDistinct(spark)
# runDrop(spark, df)
def runDrop(spark, df):
df.drop(df["store"]).show()
# runDrop(spark, sample_df2)
# runIntersect(spark)
def runIntersect(spark):
a = spark.range(1, 5)
b = spark.range(2, 6)
c = a.intersect(b)
c.show()
# runIntersect(spark)
# runExcept(spark)
def runExcept(spark):
df1 = spark.range(1, 6)
df2 = spark.createDataFrame([(2,), (4,)], ['value'])
# 파이썬의 경우 except 대신 subtract 메서드 사용
# subtract의 동작은 except와 같음
df1.subtract(df2).show()
# runExcept(spark)
# runJoin(spark, ldf, rdf)
def runJoin(spark, ldf, rdf):
joinTypes = "inner,outer,leftouter,rightouter,leftsemi".split(",")
for joinType in joinTypes:
print("============= %s ===============" % joinType)
ldf.join(rdf, ["word"], joinType).show()
# runJoin(spark, ldf, rdf)
# runNa(spark, ldf, rdf)
def runNa(spark, ldf, rdf):
result = ldf.join(rdf, ["word"], "outer").toDF("word", "c1", "c2")
result.show()
# 파이썬의 경우 na.drop또는 dropna 사용 가능
# c1과 c2 칼럼의 null이 아닌 값의 개수가 thresh 이하일 경우 drop
# thresh=1로 설정할 경우 c1 또는 c2 둘 중의 하나만 null 아닌 값을 가질 경우
# 결과에 포함시킨다는 의미가 됨
result.na.drop(thresh=2, subset=["c1", "c2"]).show()
result.dropna(thresh=2, subset=["c1", "c2"]).show()
# fill
result.na.fill({"c1": 0}).show()
# 파이썬의 경우 to_replace에 딕셔너리를 지정하여 replace를 수행(이 경우 value에 선언한 값은 무시됨
# 딕셔너리를 사용하지 않을 경우 키 목록(첫번째 인자)과 값 목록(두번째 인자)을 지정하여 replace 수행
result.na.replace(to_replace={"w1": "word1", "w2": "word2"}, value="", subset="word").show()
result.na.replace(["w1", "w2"], ["word1", "word2"], "word").show()
# runNa(spark, ldf, rdf)
# runOrderBy(spark)
def runOrderBy(spark):
df = spark.createDataFrame([(3, "z"), (10, "a"), (5, "c")], ["idx", "name"])
df.orderBy("name", "idx").show()
df.orderBy("idx", "name").show()
# runOrderBy(spark)
# runRollup(spark, df)
def runRollup(spark, df):
df.rollup("store", "product").agg({"price": "sum"}).show();
runRollup(spark, sample_df2)
# runStat(spark)
def runStat(spark):
df = spark.createDataFrame([("a", 6), ("b", 4), ("c", 12), ("d", 6)], ["word", "count"])
df.show()
df.stat.crosstab("word", "count").show()
runStat(spark)
# runWithColumn(spark)
def runWithColumn(spark):
df1 = spark.createDataFrame([("prod1", "100"), ("prod2", "200")], ["pname", "price"])
df2 = df1.withColumn("dcprice", df1["price"] * 0.9)
df3 = df2.withColumnRenamed("dcprice", "newprice")
df1.show()
df2.show()
df3.show()
# runWithColumn(spark)