SSENI's
search
sseni
말하는 감자에서 자라기
Today
Yesterday
2021.06.28 빅데이터 분석을 위한 스파크 프로그래밍 - DataFrame, DataSet
## dataframe_sample.py, word.py
class Word:
def __init__(self, word, count):
self.word = word
self.count = count
# createDataFrame()
import collections
from pyspark import StorageLevel
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions
from pyspark.sql.types import *
from pyspark.sql.window import Window
import time
from word import Word
# Session essential : builder, getOrCreate
spark = SparkSession \
.builder \
.appName("sample") \
.master("local[*]") \
.config("spark.sql.warehouse.dir", "file:///Users/beginspark/Temp/") \
.config("spark.driver.host", "127.0.0.1") \
.getOrCreate()
sc = spark.sparkContext
def createDataFrame(spark, sc):
sparkHomeDir = "file:/home/master/spark-2.4.8-bin-hadoop2.7"
# 1. create file
df1 = spark.read.json(sparkHomeDir + "/examples/src/main/resources/people.json")
# df1.show()
df2 = spark.read.parquet(sparkHomeDir + "/examples/src/main/resources/users.parquet")
# df2.show()
df3 = spark.read.text(sparkHomeDir + "/examples/src/main/resources/people.txt")
# df3.show()
# designate schema
sf1 = StructField("name", StringType(), True)
sf2 = StructField("age", IntegerType(), True)
sf3 = StructField("job", StringType(), True)
schema = StructType([sf1, sf2, sf3])
r1 = ("hayoon" , 7, "student")
r2 = ("sunwoo", 13, "student")
r3 = ("hajoo", 5, "kindergartener")
r4 = ("jinwoo", 13, "student")
rows = [r1, r2, r3, r4]
df6 = spark.createDataFrame(rows, schema)
df6.show()
createDataFrame(spark, sc)
- show (row 개수, column 길이 자르는 여부)
# runBasicOpsEx()
# sample dataFrame 1
Person = collections.namedtuple('Person', 'name age job')
row1 = Person(name="hayoon", age=7, job="student")
row2 = Person(name="sunwoo", age=13, job="student")
row3 = Person(name="hajoo", age=5, job="kindergartener")
row4 = Person(name="jinwoo", age=13, job="student")
data = [row1, row2, row3, row4]
sample_df = spark.createDataFrame(data)
def runBasicOpsEx(spark, sc, df):
df.show()
df.head()
df.first()
df.take(2)
df.count()
df.collect()
df.describe("age").show()
df.persist(StorageLevel.MEMORY_AND_DISK_2)
df.printSchema()
df.columns
df.dtypes
df.schema
runBasicOpsEx(spark, sc, sample_df)
# runBasicOpsEx() - dataFrame sql 처리(spark.sql), 실행 계획 정보 출력(explain)
def runBasicOpsEx(spark, sc, df):
df.show()
df.head()
df.first()
df.take(2)
df.count()
df.collect()
df.describe("age").show()
df.persist(StorageLevel.MEMORY_AND_DISK_2)
df.printSchema()
df.columns
df.dtypes
df.schema
df.createOrReplaceTempView("users")
spark.sql("select name, age from users where age > 20").show()
spark.sql("select name, age from users where age > 20").explain()
# runColumnEx()
def runColumnEx(spark, sc, df):
df.where(df.age > 10).show()
runColumnEx(spark, sc, sample_df)
# runAlias() - column 이름 별칭 부여 시 사용
def runAlias(spark, sc, df):
df.select(df.age + 1).show()
df.select((df.age + 1).alias("age")).show()
runAlias(spark, sc, sample_df)
# runIsinEx() - filter 와 동일하게 적용
def runIsinEx(spark, sc):
nums = spark.sparkContext.broadcast([1,3,5,7,9])
rdd = spark.sparkContext.parallelize(range(0, 10)).map(lambda v: Row(v))
df = spark.createDataFrame(rdd)
df.where(df._1.isin(nums.value)).show()
runIsinEx(spark, sc)
# runWhenEx()
def runWhenEx(spark, sc):
ds = spark.range(0, 5)
col = when(ds.id % 2 == 0, "even").otherwise("odd").alias("type")
ds.select(ds.id, col).show()
runWhenEx(spark, sc)
# runCollectionFunctions()
def runCollectionFunctions(spark):
df = spark.createDataFrame([{'numbers': '9,1,5,3,9'}])
arrayCol = split(df.numbers, ",")
# array_Contains, size
df.select(arrayCol, array_contains(arrayCol, '2'), size(arrayCol)).show(truncate=False)
# sort_array()
df.select(arrayCol, sort_array(arrayCol)).show(truncate=False)
# explode, posexplode
df.select(explode(arrayCol)).show(truncate=False)
df.select(posexplode(arrayCol)).show(truncate=False)
# runCollectionFunctions(spark)
# runDateFunctions()
def runDateFunctions(spark):
# window
f3 = StructField("date", StringType(), True)
f4 = StructField("product", StringType(), True)
f5 = StructField("amount", IntegerType(), True)
schema2 = StructType([f3, f4, f5])
r2 = ("2017-12-25 12:01:00", "note", 1000)
r3 = ("2017-12-25 12:01:10", "pencil", 3500)
r4 = ("2017-12-25 12:03:20", "pencil", 23000)
r5 = ("2017-12-25 12:05:00", "note", 1500)
r6 = ("2017-12-25 12:05:07", "note", 2000)
r7 = ("2017-12-25 12:06:25", "note", 1000)
r8 = ("2017-12-25 12:08:00", "pencil", 500)
r9 = ("2017-12-25 12:09:45", "note", 30000)
dd = spark.createDataFrame([r2,r3,r4,r5,r6,r7,r8,r9], schema2);
timeCol = unix_timestamp(dd["date"]).cast("timestamp");
windowCol = window(timeCol, "5 minutes")
dd.groupBy(windowCol, dd["product"]).agg(sum(dd["amount"])).show(truncate=False);
runDateFunctions(spark)
# runUDF()
def runUDF(spark, df): # user define function
# function을 이용한 등록
fn1 = functions.udf(lambda job: job == "student")
df.select(df["name"], df["age"], df["job"], fn1(df["job"])).show()
# SparkSession을 이용한 등록
spark.udf.register("fn2", lambda job: job == "student")
df.createOrReplaceTempView("persons")
spark.sql("select name, age, job, fn2(job) from persons").show()
runUDF(spark, sample_df)
###total code###
import collections
from pyspark import StorageLevel
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions
from pyspark.sql.types import *
from pyspark.sql.window import Window
import time
from word import Word
# Session essential : builder, getOrCreate
spark = SparkSession \
.builder \
.appName("sample") \
.master("local[*]") \
.config("spark.sql.warehouse.dir", "file:///Users/beginspark/Temp/") \
.config("spark.driver.host", "127.0.0.1") \
.getOrCreate()
sc = spark.sparkContext
def createDataFrame(spark, sc):
sparkHomeDir = "file:/home/master/spark-2.4.8-bin-hadoop2.7"
# 1. create file
df1 = spark.read.json(sparkHomeDir + "/examples/src/main/resources/people.json")
# df1.show()
df2 = spark.read.parquet(sparkHomeDir + "/examples/src/main/resources/users.parquet")
# df2.show()
df3 = spark.read.text(sparkHomeDir + "/examples/src/main/resources/people.txt")
# df3.show()
# designate schema
sf1 = StructField("name", StringType(), True)
sf2 = StructField("age", IntegerType(), True)
sf3 = StructField("job", StringType(), True)
schema = StructType([sf1, sf2, sf3])
r1 = ("hayoon" , 7, "student")
r2 = ("sunwoo", 13, "student")
r3 = ("hajoo", 5, "kindergartener")
r4 = ("jinwoo", 13, "student")
rows = [r1, r2, r3, r4]
df6 = spark.createDataFrame(rows, schema)
df6.show()
# createDataFrame(spark, sc)
# sample dataFrame 1
Person = collections.namedtuple('Person', 'name age job')
row1 = Person(name="hayoon", age=7, job="student")
row2 = Person(name="sunwoo", age=13, job="student")
row3 = Person(name="hajoo", age=5, job="kindergartener")
row4 = Person(name="jinwoo", age=13, job="student")
data = [row1, row2, row3, row4]
sample_df = spark.createDataFrame(data)
# dataFrame information
def runBasicOpsEx(spark, sc, df):
df.show()
df.head()
df.first()
df.take(2)
df.count()
df.collect()
df.describe("age").show()
df.persist(StorageLevel.MEMORY_AND_DISK_2)
df.printSchema()
df.columns
df.dtypes
df.schema
df.createOrReplaceTempView("users")
spark.sql("select name, age from users where age <= 5").show()
spark.sql("select name, age from users where age > 20").explain()
# runBasicOpsEx(spark, sc, sample_df)
def runColumnEx(spark, sc, df):
df.where(df.age > 10).show()
# runColumnEx(spark, sc, sample_df)
def runAlias(spark, sc, df):
df.select(df.age + 1).show()
df.select((df.age + 1).alias("age")).show()
# runAlias(spark, sc, sample_df)
def runIsinEx(spark, sc):
nums = spark.sparkContext.broadcast([1,3,5,7,9])
rdd = spark.sparkContext.parallelize(range(0, 10)).map(lambda v: Row(v))
df = spark.createDataFrame(rdd)
df.where(df._1.isin(nums.value)).show()
# runIsinEx(spark, sc)
def runWhenEx(spark, sc):
ds = spark.range(0, 5)
col = when(ds.id % 2 == 0, "even").otherwise("odd").alias("type")
ds.select(ds.id, col).show()
# runWhenEx(spark, sc)
def runCollectionFunctions(spark):
df = spark.createDataFrame([{'numbers': '9,1,5,3,9'}])
arrayCol = split(df.numbers, ",")
# array_Contains, size
df.select(arrayCol, array_contains(arrayCol, 2), size(arrayCol)).show(truncate=False)
# sort_array()
df.select(arrayCol, sort_array(arrayCol)).show(truncate=False)
# explode, posexplode
df.select(explode(arrayCol)).show(truncate=False)
df.select(posexplode(arrayCol)).show(truncate=False)
# runCollectionFunctions(spark)
def runDateFunctions(spark):
# window
f3 = StructField("date", StringType(), True)
f4 = StructField("product", StringType(), True)
f5 = StructField("amount", IntegerType(), True)
schema2 = StructType([f3, f4, f5])
r2 = ("2017-12-25 12:01:00", "note", 1000)
r3 = ("2017-12-25 12:01:10", "pencil", 3500)
r4 = ("2017-12-25 12:03:20", "pencil", 23000)
r5 = ("2017-12-25 12:05:00", "note", 1500)
r6 = ("2017-12-25 12:05:07", "note", 2000)
r7 = ("2017-12-25 12:06:25", "note", 1000)
r8 = ("2017-12-25 12:08:00", "pencil", 500)
r9 = ("2017-12-25 12:09:45", "note", 30000)
dd = spark.createDataFrame([r2,r3,r4,r5,r6,r7,r8,r9], schema2);
timeCol = unix_timestamp(dd["date"]).cast("timestamp");
windowCol = window(timeCol, "5 minutes")
dd.groupBy(windowCol, dd["product"]).agg(sum(dd["amount"])).show(truncate=False);
# runDateFunctions(spark)
def runUDF(spark, df): # User Define Function
# function
fn1 = functions.udf(lambda job: job == "student")
df.select(df["name"], df["age"], df["job"], fn1(df["job"])).show()
# SparkSession
spark.udf.register("fn2", lambda job: job == "student")
df.createOrReplaceTempView("persons")
spark.sql("select name, age, job, fn2(job) from persons").show()
# runUDF(spark, sample_df)
# putty
20.194.22.56
hadoopuser
dilab9196
spark_home에
cd $SPARK_HOME
cd sbin
ls
대시보드 화면 20.194.22.56:8080
./stop-all.sh (하지마셈 스파크 서버 종료시킴)
jps
intellij terminal 에 ./spark-submit --master spark://20.194.22.56:7077 /home/master/IdeaProjects/Hello/dataframe_sample.py
spark-submit --excutor-cores 10 --executor-memory 1GB // 본인이 사용할 core 지정
2021.06.30 빅데이터 분석을 위한 스파크 프로그래밍 - 머신 러닝 (0) | 2021.06.30 |
---|---|
2021.06.29 빅데이터 분석을 위한 스파크 프로그래밍 - DataFrame 함수 (0) | 2021.06.29 |
2021.06.25 빅데이터 분석을 위한 스파크 프로그래밍 - RDD 트랜스포메이션 ~ 액션 (0) | 2021.06.25 |
2021.06.25 빅데이터 분석을 위한 스파크 프로그래밍 - virtualbox로 intellij 시작하기 (0) | 2021.06.25 |
2021.06.23 빅데이터 분석을 위한 스파크 프로그래밍 - VirtualBox 통해 pyspark 실행하기 (0) | 2021.06.23 |