CLASS/Spark,Hadoop,Docker,Data Visualization

[빅데이터분산컴퓨팅] 2022.10.25 The Ratings Counter, Friends by Age, Filtering RDD's

sseni 2022. 10. 30. 23:26

## 1H ## RatingCounter


from pyspark import SparkConf, SparkContext
import collections

conf = SparkConf().setMaster("local").setAppName("RatingHistogram")
sc = SparkContext(conf = conf)

lines = sc.textFile("file:///C:/Users/jsl11/SparkCourse/ml-100k/")
ratings = x : x.split()[2])
result = ratings.countByValue()

sortedResults = collections.OrderedDict(sorted(result.items()))
for key, value in sortedResults.items():
    print("%s %i" %(key, value))
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
1 6110
2 11370
3 27145
4 34174
5 21201
  • SparkConf().setMaster("local").setAppName("RatingsHistogram")
  • 싱글 클러스터
  • sc = SparkContext(conf = conf)
  • 설정한 conf를 SparkContext로 설정. 일종의 환경설정
  • lines = sc.textFile("file:///C:/Users/jsl11/SparkCourse/ml-100k/")  // 10 만개의 데이터 행단위로 불러옴
  • Load the Data
  • ratings = x : x.split()[2])
  • Extract(Map) the Data We care about
  • key에 의해 sorting된 결과를 볼 수 있음

Transformations와 Actions의 차이 : RDD에서 RDD / RDD에서 스칼라 같은 새로운 데이터 타입을 만드는지 

spark-submit 하기 전에 spark-shell


http://localhost:4040/jobs/ 에서 볼 수 있음

stand-alone 방식을 사용하기 때문에 1개

## Assign 02 ##

u.user 에서 1) 나이 별로 빈도를 출력하시오. 2) 성별로 빈도를 출력하시옹.

from pyspark import SparkConf, SparkContext
import collections

conf = SparkConf().setMaster("local").setAppName("RatingHistogram")
sc = SparkContext(conf = conf)

lines = sc.textFile("file:///C:/Users/jsl11/SparkCourse/ml-100k/u.user")
# ratings = x : x.split("|")[2])
ratings = x : x.split("|")[1])

result = ratings.countByValue()

sortedResults = collections.OrderedDict(sorted(result.items()))
for key, value in sortedResults.items():
    print("%s %i" %(key, value))

## 2H ## Friends by Age

friends by age - 연령별로 친구들의 수를 계산하는 예제

key : age, value : number of friends 

  • reduceBykey() : 키에 의해 축소됨
  • groupByKey() : 같은 키에 대해 그룹으로 묶어줌
  • sortByKey() : key value에 의해 RDD 정렬
  • keys(), Values() : 새로운 RDD를 만드는 데, key로만 이루어진, value로만 이루어진 
  • mapValues(), flatMapValues()

Input : ID, name, age, number of friends (쉼표로 구분)


from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("FriendsByAge")
sc = SparkContext(conf = conf)

def parseLine(line):
    fields = line.split(',')
    age = int(fields[2])
    numFriends = int(fields[3])
    return (age, numFriends)

lines = sc.textFile("file:///C:/Users/jsl11/SparkCourse/fakefriends.csv")
rdd =
totalsByAge = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
averagesByAge = totalsByAge.mapValues(lambda x: x[0] / x[1])
results = averagesByAge.collect()
for result in results:
  • mapValues(lambda x: (x, 1))
    • (33, 385) => (33, (385, 1))
    • (33, 2) => (33, (2, 1))
  • reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
    • 키가 고정되고 같은 것끼리 value 값을 더해줌
    • (33, 385) => (33, (385, 1))
    • (33, 2) => (33, (2, 1))
    • ==> (33, (387, 2))
  • averagesByAge = totalsByAge.mapValues(lambda x: x[0] / x[1])
    • 평균을 구해줌
    • (33, (387, 2)) ==> (33, 193.5)
  • averagesByAge.collect()
    • 모아서 저장 후 출력
  • totalsByAge = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
    • mapValues 먼저 실행 후 reduceByKey 실행됨



(33, 325.3333333333333)
(26, 242.05882352941177)
(55, 295.53846153846155)
(40, 250.8235294117647)
(68, 269.6)
(59, 220.0)
(37, 249.33333333333334)
(54, 278.0769230769231)
(38, 193.53333333333333)
(27, 228.125)
(53, 222.85714285714286)
(57, 258.8333333333333)
(56, 306.6666666666667)
(43, 230.57142857142858)
(36, 246.6)
(22, 206.42857142857142)
(35, 211.625)
(45, 309.53846153846155)
(60, 202.71428571428572)
(67, 214.625)
(19, 213.27272727272728)
(30, 235.8181818181818)
(51, 302.14285714285717)
(25, 197.45454545454547)
(21, 350.875)
(42, 303.5)
(49, 184.66666666666666)
(48, 281.4)
(50, 254.6)
(39, 169.28571428571428)
(32, 207.9090909090909)
(58, 116.54545454545455)
(64, 281.3333333333333)
(31, 267.25)
(52, 340.6363636363636)
(24, 233.8)
(20, 165.0)
(62, 220.76923076923077)
(41, 268.55555555555554)
(44, 282.1666666666667)
(69, 235.2)
(65, 298.2)
(61, 256.22222222222223)
(28, 209.1)
(66, 276.44444444444446)
(46, 223.69230769230768)
(29, 215.91666666666666)
(18, 343.375)
(47, 233.22222222222223)
(34, 245.5)
(63, 384.0)
(23, 246.3)


results_sort = averagesByAge.sortByKey()
results = results_sort.collect()


(18, 343.375)
(19, 213.27272727272728)
(20, 165.0)
(21, 350.875)
(22, 206.42857142857142)
(23, 246.3)
(24, 233.8)
(25, 197.45454545454547)
(26, 242.05882352941177)
(27, 228.125)
(28, 209.1)
(29, 215.91666666666666)
(30, 235.8181818181818)
(31, 267.25)
(32, 207.9090909090909)
(33, 325.3333333333333)
(34, 245.5)
(35, 211.625)
(36, 246.6)
(37, 249.33333333333334)
(38, 193.53333333333333)
(39, 169.28571428571428)
(40, 250.8235294117647)
(41, 268.55555555555554)
(42, 303.5)
(43, 230.57142857142858)
(44, 282.1666666666667)
(45, 309.53846153846155)
(46, 223.69230769230768)
(47, 233.22222222222223)
(48, 281.4)
(49, 184.66666666666666)
(50, 254.6)
(51, 302.14285714285717)
(52, 340.6363636363636)
(53, 222.85714285714286)
(54, 278.0769230769231)
(55, 295.53846153846155)
(56, 306.6666666666667)
(57, 258.8333333333333)
(58, 116.54545454545455)
(59, 220.0)
(60, 202.71428571428572)
(61, 256.22222222222223)
(62, 220.76923076923077)
(63, 384.0)
(64, 281.3333333333333)
(65, 298.2)
(66, 276.44444444444446)
(67, 214.625)
(68, 269.6)
(69, 235.2)

## Assign 03 ##

1) 나이 기준으로 정렬되어있는 프로그램을 친구 수 기준으로 정렬해서 출력하시오.

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("FriendsByAge")
sc = SparkContext(conf = conf)

def parseLine(line):
    fields = line.split(',')
    age = int(fields[2])
    numFriends = int(fields[3])
    return (age, numFriends)

lines = sc.textFile("file:///C:/Users/jsl11/SparkCourse/fakefriends.csv")
rdd =
totalsByAge = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
averagesByAge = totalsByAge.mapValues(lambda x: x[0] / x[1])

results = averagesByAge.collect()
result_sort = sorted(results, key=lambda x: x[1])

for result in result_sort:
(58, 116.54545454545455)                                                        
(20, 165.0)
(39, 169.28571428571428)
(49, 184.66666666666666)
(38, 193.53333333333333)
(25, 197.45454545454547)
(60, 202.71428571428572)
(22, 206.42857142857142)
(32, 207.9090909090909)
(28, 209.1)
(35, 211.625)
(19, 213.27272727272728)
(67, 214.625)
(29, 215.91666666666666)
(59, 220.0)
(62, 220.76923076923077)
(53, 222.85714285714286)
(46, 223.69230769230768)
(27, 228.125)
(43, 230.57142857142858)
(47, 233.22222222222223)
(24, 233.8)
(69, 235.2)
(30, 235.8181818181818)
(26, 242.05882352941177)
(34, 245.5)
(23, 246.3)
(36, 246.6)
(37, 249.33333333333334)
(40, 250.8235294117647)
(50, 254.6)
(61, 256.22222222222223)
(57, 258.8333333333333)
(31, 267.25)
(41, 268.55555555555554)
(68, 269.6)
(66, 276.44444444444446)
(54, 278.0769230769231)
(64, 281.3333333333333)
(48, 281.4)
(44, 282.1666666666667)
(55, 295.53846153846155)
(65, 298.2)
(51, 302.14285714285717)
(42, 303.5)
(56, 306.6666666666667)
(45, 309.53846153846155)
(33, 325.3333333333333)
(52, 340.6363636363636)
(18, 343.375)
(21, 350.875)
(63, 384.0)

## 3H ##

1) Filtering RDD's (Weather Data)

특정한 조건(데이터 중 "TMIN"이라는 값일 때)에 해당되는 데이터만 추출


from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("MinTemperatures")
sc = SparkContext(conf = conf)

def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return (stationID, entryType, temperature)

lines = sc.textFile("file:///SparkCourse/1800.csv")
parsedLines =
minTemps = parsedLines.filter(lambda x: "TMIN" in x[1])
stationTemps = x: (x[0], x[2]))
minTemps = stationTemps.reduceByKey(lambda x, y: min(x,y))
results = minTemps.collect();

for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))
  • minTemps = parsedLines.filter(lambda x: "TMIN" in x[1])
    • TMIN이라는 값을 찾아 minTemps에 저장
  • stationTemps = x: (x[0], x[2]))
    • minTemps를 x[0]와 x[2]로 매핑
    • key / value pairs (stationID, Temperature)
  • minTemps = stationTemps.reduceByKey(lambda x, y: min(x,y))


ITE00100554     5.36F
EZE00100082     7.70F



from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("MinTemperatures")
sc = SparkContext(conf = conf)

def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return (stationID, entryType, temperature)

lines = sc.textFile("file:///C:/Users/jsl11/SparkCourse/1800.csv")
parsedLines =
maxTemps = parsedLines.filter(lambda x: "TMAX" in x[1])
stationTemps = x: (x[0], x[2]))
maxTemps = stationTemps.reduceByKey(lambda x, y: max(x,y))
results = maxTemps.collect();

for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))


ITE00100554     90.14F
EZE00100082     90.14F


2) Map vs FlatMap

  • Map : 각각의 원소를 하나의 RDD element로 생성시켜줌
    • ex. The quick red fox jumped 
    • line = x : x.upper()) ==> THE QUICK RED FOX JUMPED ( 한 덩어리 )
  • FlatMap : 각각을 이용해 많은 수의 새로운 element를 생성시켜줌
    • ex. The quick red fox jumped 
    • line = lines.flatMap(lambda x : x.split()) ==> The quick red fox jumped ( 각각의 단어가 하나의 element가 됨 )


from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("WordCount")
sc = SparkContext(conf = conf)

input = sc.textFile("file:///C:/Users/jsl11/SparkCourse/in/word_count.text")
words = input.flatMap(lambda x: x.split())
wordCounts = words.countByValue()

for word, count in wordCounts.items():
    cleanWord = word.encode('ascii', 'ignore')
    if (cleanWord):
        print(cleanWord.decode() + " " + str(count))
nationally 1
globallyrenowned. 1
manufacturing 1
eroded 1
restructuring 1
industry, 1
transitioned 1
into 1
industries. 1
September 1
11 1
attacks 2
2001 1
destroyed 1
Trade 1
Center, 1
killing 1
almost 1
3,000 1
people; 1
they 1
largest 1
terrorist 1
on 1
soil. 1


3) Improving Word Count

대소문자, 콤마, 마침표 문제 ==> 자연어 처리 같은 데이터 정규화, 결과 정렬


import re
from pyspark import SparkConf, SparkContext

def normalizeWords(text):
    return re.compile(r'\W+', re.UNICODE).split(text.lower())

conf = SparkConf().setMaster("local").setAppName("WordCount")
sc = SparkContext(conf = conf)

input = sc.textFile("file:///C:/Users/jsl11/SparkCourse/in/word_count.text")
words = input.flatMap(normalizeWords)
wordCounts = words.countByValue()

for word, count in wordCounts.items():
    cleanWord = word.encode('ascii', 'ignore')
    if (cleanWord):
        print(cleanWord.decode() + " " + str(count))
nationally 1
globallyrenowned 1
manufacturing 1
eroded 1
restructuring 1
industry 1
transitioned 1
into 1
september 1
11 1
attacks 2
2001 1
destroyed 1
killing 1
almost 1
3 1
people 1
they 1
largest 1
terrorist 1
on 1
soil 1


4) Sorting the Results


import re
from pyspark import SparkConf, SparkContext

def normalizeWords(text):
    return re.compile(r'\W+', re.UNICODE).split(text.lower())

conf = SparkConf().setMaster("local").setAppName("WordCount")
sc = SparkContext(conf = conf)

input = sc.textFile("file:///C:/Users/jsl11/SparkCourse/in/word_count.text")
words = input.flatMap(normalizeWords)

wordCounts = x: (x, 1)).reduceByKey(lambda x, y: x + y)
wordCountsSorted = x: (x[1], x[0])).sortByKey()
results = wordCountsSorted.collect()

for result in results:
    count = str(result[0])
    word = result[1].encode('ascii', 'ignore')
    if (word):
        print(word.decode() + ":\t\t" + count)
opened:         3
world:          3
around:         4
trade:          4
century:                4
during:         4
united:         4
states:         4
first:          5
as:             5
s:              6
became:         6
war:            6
state:          7
was:            8
from:           8
city:           10
a:              11
to:             17
york:           20
new:            21
and:            21
in:             24
of:             33
the:            81

==> 모두 소문자로 변환되어 있음

## Assign 04 ##

역순으로 sort 후 출력

import re
from pyspark import SparkConf, SparkContext

def normalizeWords(text):
    return re.compile(r'\W+', re.UNICODE).split(text.lower())

conf = SparkConf().setMaster("local").setAppName("WordCount")
sc = SparkContext(conf = conf)

input = sc.textFile("file:///C:/Users/jsl11/SparkCourse/in/word_count.text")
words = input.flatMap(normalizeWords)

wordCounts = x: (x, 1)).reduceByKey(lambda x, y: x + y)
wordCountsSorted = x: (x[1], x[0])).sortByKey()

results = wordCountsSorted.collect()
results_ = sorted(results, key = lambda x : x[0], reverse=True)

for result in results_:
    count = str(result[0])
    word = result[1].encode('ascii', 'ignore')
    if (word):
        print(word.decode() + ":\t\t" + count)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
the: 81
of: 33
in: 24
new: 21
and: 21
york: 20
to: 17
a: 11
city: 10
was: 8
from: 8
state: 7
s: 6
became: 6
war: 6
first: 5
as: 5
around: 4
trade: 4
century: 4
during: 4
united: 4
states: 4



