Scroll indicator done
728x90

## dataframe_sample.py, word.py

class Word:
    def __init__(self, word, count):
        self.word = word
        self.count = count

 

# createDataFrame()

import collections

from pyspark import StorageLevel
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions
from pyspark.sql.types import *
from pyspark.sql.window import Window
import time

from word import Word

# Session essential : builder, getOrCreate
spark = SparkSession \
    .builder \
    .appName("sample") \
    .master("local[*]") \
    .config("spark.sql.warehouse.dir", "file:///Users/beginspark/Temp/") \
    .config("spark.driver.host", "127.0.0.1") \
    .getOrCreate()

sc = spark.sparkContext

def createDataFrame(spark, sc):
    sparkHomeDir = "file:/home/master/spark-2.4.8-bin-hadoop2.7"

    # 1. create file
    df1 = spark.read.json(sparkHomeDir + "/examples/src/main/resources/people.json")
    # df1.show()
    df2 = spark.read.parquet(sparkHomeDir + "/examples/src/main/resources/users.parquet")
    # df2.show()
    df3 = spark.read.text(sparkHomeDir + "/examples/src/main/resources/people.txt")
    # df3.show()

    # designate schema
    sf1 = StructField("name", StringType(), True)
    sf2 = StructField("age", IntegerType(), True)
    sf3 = StructField("job", StringType(), True)
    schema = StructType([sf1, sf2, sf3])
    r1 = ("hayoon" , 7, "student")
    r2 = ("sunwoo", 13, "student")
    r3 = ("hajoo", 5, "kindergartener")
    r4 = ("jinwoo", 13, "student")
    rows = [r1, r2, r3, r4]
    df6 = spark.createDataFrame(rows, schema)
    df6.show()

createDataFrame(spark, sc)

df1~3 결과
스키마 지정

- show (row 개수, column 길이 자르는 여부)

 

# runBasicOpsEx()

# sample dataFrame 1
Person = collections.namedtuple('Person', 'name age job')
row1 = Person(name="hayoon", age=7, job="student")
row2 = Person(name="sunwoo", age=13, job="student")
row3 = Person(name="hajoo", age=5, job="kindergartener")
row4 = Person(name="jinwoo", age=13, job="student")
data = [row1, row2, row3, row4]
sample_df = spark.createDataFrame(data)

def runBasicOpsEx(spark, sc, df):
    df.show()
    df.head()
    df.first()
    df.take(2)
    df.count()
    df.collect()
    df.describe("age").show()
    df.persist(StorageLevel.MEMORY_AND_DISK_2)
    df.printSchema()
    df.columns
    df.dtypes
    df.schema

runBasicOpsEx(spark, sc, sample_df)

runBasicOpsEx 결과

 

# runBasicOpsEx() - dataFrame sql 처리(spark.sql), 실행 계획 정보 출력(explain)

def runBasicOpsEx(spark, sc, df):
    df.show()
    df.head()
    df.first()
    df.take(2)
    df.count()
    df.collect()
    df.describe("age").show()
    df.persist(StorageLevel.MEMORY_AND_DISK_2)
    df.printSchema()
    df.columns
    df.dtypes
    df.schema
    df.createOrReplaceTempView("users")
    spark.sql("select name, age from users where age > 20").show()
    spark.sql("select name, age from users where age > 20").explain()

 

# runColumnEx()

def runColumnEx(spark, sc, df):
    df.where(df.age > 10).show()
    
runColumnEx(spark, sc, sample_df)

 

# runAlias() - column 이름 별칭 부여 시 사용

def runAlias(spark, sc, df):
    df.select(df.age + 1).show()
    df.select((df.age + 1).alias("age")).show()

runAlias(spark, sc, sample_df)

 

# runIsinEx() - filter 와 동일하게 적용

def runIsinEx(spark, sc):
    nums = spark.sparkContext.broadcast([1,3,5,7,9])
    rdd = spark.sparkContext.parallelize(range(0, 10)).map(lambda v: Row(v))
    df = spark.createDataFrame(rdd)
    df.where(df._1.isin(nums.value)).show()

runIsinEx(spark, sc)

 

# runWhenEx()

def runWhenEx(spark, sc):
    ds = spark.range(0, 5)
    col = when(ds.id % 2 == 0, "even").otherwise("odd").alias("type")
    ds.select(ds.id, col).show()

runWhenEx(spark, sc)

 

# runCollectionFunctions()

def runCollectionFunctions(spark):
    df = spark.createDataFrame([{'numbers': '9,1,5,3,9'}])
    arrayCol = split(df.numbers, ",")
    # array_Contains, size
    df.select(arrayCol, array_contains(arrayCol, '2'), size(arrayCol)).show(truncate=False)
    # sort_array()
    df.select(arrayCol, sort_array(arrayCol)).show(truncate=False)
    # explode, posexplode
    df.select(explode(arrayCol)).show(truncate=False)
    df.select(posexplode(arrayCol)).show(truncate=False)

# runCollectionFunctions(spark)

 

# runDateFunctions()

def runDateFunctions(spark):
    # window
    f3 = StructField("date", StringType(), True)
    f4 = StructField("product", StringType(), True)
    f5 = StructField("amount", IntegerType(), True)
    schema2 = StructType([f3, f4, f5])

    r2 = ("2017-12-25 12:01:00", "note", 1000)
    r3 = ("2017-12-25 12:01:10", "pencil", 3500)
    r4 = ("2017-12-25 12:03:20", "pencil", 23000)
    r5 = ("2017-12-25 12:05:00", "note", 1500)
    r6 = ("2017-12-25 12:05:07", "note", 2000)
    r7 = ("2017-12-25 12:06:25", "note", 1000)
    r8 = ("2017-12-25 12:08:00", "pencil", 500)
    r9 = ("2017-12-25 12:09:45", "note", 30000)

    dd = spark.createDataFrame([r2,r3,r4,r5,r6,r7,r8,r9], schema2);
    timeCol = unix_timestamp(dd["date"]).cast("timestamp");
    windowCol = window(timeCol, "5 minutes")
    dd.groupBy(windowCol, dd["product"]).agg(sum(dd["amount"])).show(truncate=False);

runDateFunctions(spark)

 

# runUDF()

def runUDF(spark, df): # user define function
    # function을 이용한 등록
    fn1 = functions.udf(lambda job: job == "student")
    df.select(df["name"], df["age"], df["job"], fn1(df["job"])).show()
    # SparkSession을 이용한 등록
    spark.udf.register("fn2", lambda job: job == "student")
    df.createOrReplaceTempView("persons")
    spark.sql("select name, age, job, fn2(job) from persons").show()

runUDF(spark, sample_df)

 

 

###total code###

import collections

from pyspark import StorageLevel
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions
from pyspark.sql.types import *
from pyspark.sql.window import Window
import time

from word import Word

# Session essential : builder, getOrCreate
spark = SparkSession \
    .builder \
    .appName("sample") \
    .master("local[*]") \
    .config("spark.sql.warehouse.dir", "file:///Users/beginspark/Temp/") \
    .config("spark.driver.host", "127.0.0.1") \
    .getOrCreate()

sc = spark.sparkContext

def createDataFrame(spark, sc):
    sparkHomeDir = "file:/home/master/spark-2.4.8-bin-hadoop2.7"

    # 1. create file
    df1 = spark.read.json(sparkHomeDir + "/examples/src/main/resources/people.json")
    # df1.show()
    df2 = spark.read.parquet(sparkHomeDir + "/examples/src/main/resources/users.parquet")
    # df2.show()
    df3 = spark.read.text(sparkHomeDir + "/examples/src/main/resources/people.txt")
    # df3.show()

    # designate schema
    sf1 = StructField("name", StringType(), True)
    sf2 = StructField("age", IntegerType(), True)
    sf3 = StructField("job", StringType(), True)
    schema = StructType([sf1, sf2, sf3])
    r1 = ("hayoon" , 7, "student")
    r2 = ("sunwoo", 13, "student")
    r3 = ("hajoo", 5, "kindergartener")
    r4 = ("jinwoo", 13, "student")
    rows = [r1, r2, r3, r4]
    df6 = spark.createDataFrame(rows, schema)
    df6.show()

# createDataFrame(spark, sc)

# sample dataFrame 1
Person = collections.namedtuple('Person', 'name age job')
row1 = Person(name="hayoon", age=7, job="student")
row2 = Person(name="sunwoo", age=13, job="student")
row3 = Person(name="hajoo", age=5, job="kindergartener")
row4 = Person(name="jinwoo", age=13, job="student")
data = [row1, row2, row3, row4]
sample_df = spark.createDataFrame(data)

# dataFrame information
def runBasicOpsEx(spark, sc, df):
    df.show()
    df.head()
    df.first()
    df.take(2)
    df.count()
    df.collect()
    df.describe("age").show()
    df.persist(StorageLevel.MEMORY_AND_DISK_2)
    df.printSchema()
    df.columns
    df.dtypes
    df.schema
    df.createOrReplaceTempView("users")
    spark.sql("select name, age from users where age <= 5").show()
    spark.sql("select name, age from users where age > 20").explain()

# runBasicOpsEx(spark, sc, sample_df)

def runColumnEx(spark, sc, df):
    df.where(df.age > 10).show()

# runColumnEx(spark, sc, sample_df)

def runAlias(spark, sc, df):
    df.select(df.age + 1).show()
    df.select((df.age + 1).alias("age")).show()

# runAlias(spark, sc, sample_df)

def runIsinEx(spark, sc):
    nums = spark.sparkContext.broadcast([1,3,5,7,9])
    rdd = spark.sparkContext.parallelize(range(0, 10)).map(lambda v: Row(v))
    df = spark.createDataFrame(rdd)
    df.where(df._1.isin(nums.value)).show()

# runIsinEx(spark, sc)

def runWhenEx(spark, sc):
    ds = spark.range(0, 5)
    col = when(ds.id % 2 == 0, "even").otherwise("odd").alias("type")
    ds.select(ds.id, col).show()

# runWhenEx(spark, sc)

def runCollectionFunctions(spark):
    df = spark.createDataFrame([{'numbers': '9,1,5,3,9'}])
    arrayCol = split(df.numbers, ",")
    # array_Contains, size
    df.select(arrayCol, array_contains(arrayCol, 2), size(arrayCol)).show(truncate=False)
    # sort_array()
    df.select(arrayCol, sort_array(arrayCol)).show(truncate=False)
    # explode, posexplode
    df.select(explode(arrayCol)).show(truncate=False)
    df.select(posexplode(arrayCol)).show(truncate=False)

# runCollectionFunctions(spark)

def runDateFunctions(spark):
    # window
    f3 = StructField("date", StringType(), True)
    f4 = StructField("product", StringType(), True)
    f5 = StructField("amount", IntegerType(), True)
    schema2 = StructType([f3, f4, f5])

    r2 = ("2017-12-25 12:01:00", "note", 1000)
    r3 = ("2017-12-25 12:01:10", "pencil", 3500)
    r4 = ("2017-12-25 12:03:20", "pencil", 23000)
    r5 = ("2017-12-25 12:05:00", "note", 1500)
    r6 = ("2017-12-25 12:05:07", "note", 2000)
    r7 = ("2017-12-25 12:06:25", "note", 1000)
    r8 = ("2017-12-25 12:08:00", "pencil", 500)
    r9 = ("2017-12-25 12:09:45", "note", 30000)

    dd = spark.createDataFrame([r2,r3,r4,r5,r6,r7,r8,r9], schema2);
    timeCol = unix_timestamp(dd["date"]).cast("timestamp");
    windowCol = window(timeCol, "5 minutes")
    dd.groupBy(windowCol, dd["product"]).agg(sum(dd["amount"])).show(truncate=False);

# runDateFunctions(spark)

def runUDF(spark, df): # User Define Function
    # function
    fn1 = functions.udf(lambda job: job == "student")
    df.select(df["name"], df["age"], df["job"], fn1(df["job"])).show()
    # SparkSession
    spark.udf.register("fn2", lambda job: job == "student")
    df.createOrReplaceTempView("persons")
    spark.sql("select name, age, job, fn2(job) from persons").show()

# runUDF(spark, sample_df)

# putty

20.194.22.56

hadoopuser

dilab9196

 

spark_home에

cd $SPARK_HOME

cd sbin

ls

대시보드 화면 20.194.22.56:8080

./stop-all.sh (하지마셈 스파크 서버 종료시킴)

jps

intellij terminal 에 ./spark-submit --master spark://20.194.22.56:7077 /home/master/IdeaProjects/Hello/dataframe_sample.py

spark-submit --excutor-cores 10 --executor-memory 1GB  // 본인이 사용할 core 지정

 

728x90