CLASS/Spark,Hadoop,Docker,Data Visualization

[빅데이터분산컴퓨팅] 2022.10.18 pyspark word count 예제

sseni 2022. 10. 18. 15:12

# pyspark 실행 확인

C:\Users\Administrator>pyspark


[실습1]

- word-count.py

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("wordCount")
sc = SparkContext(conf = conf)

input = sc.textFile("file:///C:/Users/Administrator/SparkCourse/in/word_count.text")
words = input.flatMap(lambda x : x.split())
wordCounts = words.countByValue()

for word, count in wordCounts.items(): # clean step
    cleanWord = word.encode("ascii", "ignore") # ascii 제외하고 모두 무시, bool 형
    
    
    if(cleanWord):  
    	print(cleanWord.decode() + " " + str(count))

sparkconf : 스파크 환경설정

sparkContext : 전에 설정한 sparkconf의 내용 저장

 

진행할 과정

Token 와 Clean 차이점 : ,(콤마)와 .(점)을 삭제, 주목적은 word count이기 때문에


>> spyder에서 실행 안 될 시, 

C:\Users\Administrator\SparkCourse>spark-submit word-count.py

>> AttributeError: module 'pyspark.rdd' has no attribute 'V'  에러

spyder에서의 pyspark를 최신 버전으로 다운로드해서 문제가 됐다. !pip install pyspark==3.1.3 으로 설치할 것.


from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, explode, lower, regexp_extract

spark = (SparkSession.builder.appName("Words Count Example").getOrCreate())
spark.sparkContext
spark.sparkContext.setLogLevel("ERROR")

 

lower : 소문자로 

regexp_extract  : 정규식으로 만들 때


[실습2]

i) spyder 주석 처리 단축키 >> ctrl + 1

# book 읽어오기

- word_count1.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, explode, lower, regexp_extract

spark = (SparkSession.builder.appName("Words Count Example").getOrCreate())

print(spark.sparkContext)

spark.sparkContext.setLogLevel("ERROR")

book = spark.read.text("file:///C:/Users/Administrator/python-spark-data2/gutenberg_books/1342-0.txt")
print(book)

book.printSchema()
print(book.dtypes)

book.show()  # 기본 20행 20문자
book.show(10, truncate=50) # 10행 50문자

lines = book.select(split(book.value, " ").alias("line"))
lines.show(5)

alias : 단순 출력용으로 이름 지정하기

lines.show(5, truncate=50)

맨 위 book.show() 와 다르게 대괄호 [] 가 붙어있음 

>> 모두 같은 의미!!

book.select(book.value)
book.select(book["value"])
book.select(col("value"))
book.select("value")

# book Word Count

words = lines.select(explode(col("line")).alias("word"))
words.show(15)

explode : 단어 별로 

words_lower = words.select(lower(col("word")).alias("word_lower")) # 소문자
words_clean = words_lower.select(regexp_extract(col("word_lower"), "[a-z]+", 0).alias("word"))
words_clean.show()

words_nonull = words_clean.filter(col("word") != "")
words_nonull.show()

groups = words_nonull.groupby(col("word"))
results = words_nonull.groupby(col("word")).count()
print(results)
results.show()

results.orderBy("count", ascending=False).show(10)
results.orderBy(col("count").desc()).show(10)

results.orderBy("word").show(10)  # 빈도 상관없이 사전순으로 show

# results.coalesce(1).write.csv("file:///C:/Users/Administrator/python-spark-data2/simple_count1.csv")

안됨. csv가 아니라 폴더 형식으로 생성됨

 

>> After

import pyspark.sql.functions as F

results = (
    spark.read.text("file:///C:/Users/Administrator/python-spark-data2/gutenberg_books/1342-0.txt")
    .select(F.split(F.col("value"), " ").alias("line"))
    .select(F.explode(F.col("line")).alias("word"))
    .select(F.lower(F.col("word")).alias("word"))
    .select(F.regexp_extract(F.col("word"), "[a-z]+", 0).alias("word"))
    .where(F.col("word") != "")
    .groupby(F.col("word"))
    .count()
)

results.orderBy("count", ascending=False).show(10)

 

300x250