# pyspark 실행 확인
C:\Users\Administrator>pyspark
[실습1]
- word-count.py
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("wordCount")
sc = SparkContext(conf = conf)
input = sc.textFile("file:///C:/Users/Administrator/SparkCourse/in/word_count.text")
words = input.flatMap(lambda x : x.split())
wordCounts = words.countByValue()
for word, count in wordCounts.items(): # clean step
cleanWord = word.encode("ascii", "ignore") # ascii 제외하고 모두 무시, bool 형
if(cleanWord):
print(cleanWord.decode() + " " + str(count))
sparkconf : 스파크 환경설정
sparkContext : 전에 설정한 sparkconf의 내용 저장
Token 와 Clean 차이점 : ,(콤마)와 .(점)을 삭제, 주목적은 word count이기 때문에
>> spyder에서 실행 안 될 시,
C:\Users\Administrator\SparkCourse>spark-submit word-count.py
>> AttributeError: module 'pyspark.rdd' has no attribute 'V' 에러
spyder에서의 pyspark를 최신 버전으로 다운로드해서 문제가 됐다. !pip install pyspark==3.1.3 으로 설치할 것.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, explode, lower, regexp_extract
spark = (SparkSession.builder.appName("Words Count Example").getOrCreate())
spark.sparkContext
spark.sparkContext.setLogLevel("ERROR")
lower : 소문자로
regexp_extract : 정규식으로 만들 때
[실습2]
i) spyder 주석 처리 단축키 >> ctrl + 1
# book 읽어오기
- word_count1.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, explode, lower, regexp_extract
spark = (SparkSession.builder.appName("Words Count Example").getOrCreate())
print(spark.sparkContext)
spark.sparkContext.setLogLevel("ERROR")
book = spark.read.text("file:///C:/Users/Administrator/python-spark-data2/gutenberg_books/1342-0.txt")
print(book)
book.printSchema()
print(book.dtypes)
book.show() # 기본 20행 20문자
book.show(10, truncate=50) # 10행 50문자
lines = book.select(split(book.value, " ").alias("line"))
lines.show(5)
alias : 단순 출력용으로 이름 지정하기
lines.show(5, truncate=50)
맨 위 book.show() 와 다르게 대괄호 [] 가 붙어있음
>> 모두 같은 의미!!
book.select(book.value)
book.select(book["value"])
book.select(col("value"))
book.select("value")
# book Word Count
words = lines.select(explode(col("line")).alias("word"))
words.show(15)
explode : 단어 별로
words_lower = words.select(lower(col("word")).alias("word_lower")) # 소문자
words_clean = words_lower.select(regexp_extract(col("word_lower"), "[a-z]+", 0).alias("word"))
words_clean.show()
words_nonull = words_clean.filter(col("word") != "")
words_nonull.show()
groups = words_nonull.groupby(col("word"))
results = words_nonull.groupby(col("word")).count()
print(results)
results.show()
results.orderBy("count", ascending=False).show(10)
results.orderBy(col("count").desc()).show(10)
results.orderBy("word").show(10) # 빈도 상관없이 사전순으로 show
# results.coalesce(1).write.csv("file:///C:/Users/Administrator/python-spark-data2/simple_count1.csv")
안됨. csv가 아니라 폴더 형식으로 생성됨
>> After
import pyspark.sql.functions as F
results = (
spark.read.text("file:///C:/Users/Administrator/python-spark-data2/gutenberg_books/1342-0.txt")
.select(F.split(F.col("value"), " ").alias("line"))
.select(F.explode(F.col("line")).alias("word"))
.select(F.lower(F.col("word")).alias("word"))
.select(F.regexp_extract(F.col("word"), "[a-z]+", 0).alias("word"))
.where(F.col("word") != "")
.groupby(F.col("word"))
.count()
)
results.orderBy("count", ascending=False).show(10)
'CLASS > Spark,Hadoop,Docker,Data Visualization' 카테고리의 다른 글
[빅데이터분산컴퓨팅] 2022.11.01 numpy, pandas, seaborn (2) | 2022.11.01 |
---|---|
[빅데이터분산컴퓨팅] 2022.10.25 The Ratings Counter, Friends by Age, Filtering RDD's (0) | 2022.10.30 |
[빅데이터분산컴퓨팅] 2022.10.11 pyspark 환경 설정, word-count.py 실행 (0) | 2022.10.11 |
[빅데이터분산컴퓨팅] 2022.10.04 spark 설치 (4) | 2022.10.04 |
[빅데이터분산컴퓨팅] 2022.09.27 HDFS 아키텍처, 텍스트 파일 word count (0) | 2022.09.27 |