내용 |
# spark-env.sh 에 삽입할 문장
export SPARK_DIST_CLASSPATH=$(hadoop classpath)
# 데이터프레임형식
df = sqlContext.read.json("file:///home/eduuser/spark/
examples/src/main/resources/people.json")
df.show()
# 데이터프레임 조작
df.printSchema()
df.select("name").show()
df.select(df['name'], df['age'] + 1).show()
df.filter(df['age'] > 21).show()
df.groupBy("age").count().show()
## 리플렉션 이용
from pyspark.sql import Row
# 텍스트 파일을 불러와 각 열을 전환
lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
# 스키마를 추정하고 데이터 프레임을 테이블로 등록
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")
# 테이블로 등록이 된 데이터프레임에 대해 SQL 문장실행이 가능
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
# SQL쿼리의 결과는 RDDs 이고 일반적인 RDD작업을 모두 지원
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
print(teenName)
## 프로그래밍 사용
from pyspark.sql.types import *
# 텍스트 파일을 불러와 튜플 각 라인을 튜플 형식으로 변환
lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], p[1].strip()))
# 스키마는 문자열로 인코딩
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
# 스키마를 RDD에 적용
schemaPeople = sqlContext.createDataFrame(people, schema)
# 데이터프레임을 테이블로 등록
schemaPeople.registerTempTable("people")
# 테이블로 등록된 데이터프레임에는 SQL문장 실행이 가능
results = sqlContext.sql("SELECT name FROM people")
# SQL문장의 결과는 기본 RDD연산이 모두 수행되는 RDDs로 주어진다
names = results.map(lambda p: "Name: " + p.name)
for name in names.collect():
print(name)
## 로드/저장 함수
df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
## 수동 조작
df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
###### 데이터 조작 실습
## 리플렉션 이용
from pyspark.sql import Row
lines = sc.textFile('jeju_2010.csv')
parts = lines.map(lambda l: l.split(','))
jeju2010 = parts.map(lambda p: Row(IN=p[0], OUT=p[1], INCOME=p[2]))
schema2010 = sqlContext.createDataFrame(jeju2010)
schema2010.registerTempTable('jeju2010')
schema2010.show()
## 수정
sqlContext.sql("select * from jeju2010 where INCOME!='INCOME'").show()
jeju2010 = sqlContext.sql("select * from jeju2010 where INCOME!='INCOME'")
## 프로그래밍 방법
from pyspark.sql.types import *
lines = sc.textFile('jeju_2011.csv')
parts = lines.map(lambda l:l.split(','))
jeju2011 = parts.map(lambda p:(p[0].strip(),p[1].strip(),p[2].strip()))
schemaString = "IN OUT INCOME"
fields=[StructField(field_name,StringType(),True) for field_name in schemaString.split()]
schema2011 = sqlContext.createDataFrame(jeju2011,schema)
schema2011.registerTempTable('jeju2011')
###### 데이터 분석 실습
## 데이터 불러오기
from pyspark.sql import Row
?
# RDD 생성
lines = sc.textFile('jeju_2010.csv')
?
# 생성된 RDD를 콤마 구분자로 구분
parts = lines.map(lambda l: l.split(','))
?
# 앞의 RDD의 첫번째 두번째 세번째 column의 데이터 지정
jeju2010 = parts.map(lambda p: Row(IN=p[0], OUT=p[1], INCOME=p[2]))
?
# 데이터 프레임 생성
schema2010 = sqlContext.createDataFrame(jeju2010)
?
# 데이터 프레임을 테이블로 등록
schema2010.registerTempTable('jeju2010')
?
# 눈으로 확인
schema2010.show()
## 데이터 수정
jeju2010 = sqlContext.sql("select * from jeju2010 where INCOME != 'INCOME'")
jeju2011 = sqlContext.sql("select * from jeju2011 where INCOME != 'INCOME'")
jeju2012 = sqlContext.sql("select * from jeju2012 where INCOME != 'INCOME'")
## 데이터 합치기
# 임시로 year 라는 새로운 변수를 포함하는 RDD 생성
tmp = jeju2010.map(lambda p:Row(YEAR=2010,IN=p[0],OUT=p[2],INCOME=p[1]))
?
# 생성된 RDD를 기반으로 데이터 프레임 만들기
jeju2010 = sqlContext.createDataFrame(tmp)
# 2010년부터 2012년 까지 모두 실행
?
# 데이터 합치기
jeju = jeju2010.unionAll(jeju2011).unionAll(jeju2012)
## 데이터 형식 변환
# RDD 생성
tmp = jeju.map(lambda p:Row( IN=int(p[0]), OUT=int(p[1]), INCOME=int(p[2]),YEAR=int(p[3]) ) )
# RDD로부터 데이터프레임 형성
jeju2 = sqlContext.createDataFrame(tmp)
## 기술 통계
jeju2.describe().show()
## 그룹화
jeju2.groupBy('YEAR').avg().show()
jeju2.select('IN','INCOME','YEAR').groupBy('YEAR').mean().show()
jeju2['IN','OUT','YEAR'].groupBy('YEAR').mean().show()
## 상관계수
jeju2.corr('IN','OUT')
jeju2.corr('IN','INCOME')
jeju2.corr('OUT','INCOME')
## 회귀분석 준비
import numpy as np
import numpy.linalg as lin
# X 매트릭스 만들기
X = np.array(jeju2.select('IN','OUT').collect())
# Y 벡터 만들기
Y = np.array(jeju2.select('INCOME').collect())
## 회귀모형 적합
# 상수가 없는 모형
Beta0 = np.dot(lin.inv(np.dot(X.T,X)),np.dot(X.T,Y))
# 상수가 있는 모형
X1 = np.hstack([np.array([np.ones(36)]).T,X])
Beta1 = np.dot(lin.inv(np.dot(X1.T,X1)),np.dot(X1.T,Y))
## 결정계수
# 결정계수
R0 = np.sum((np.dot(X,Beta0)-np.mean(Y))**2)/np.sum((Y-np.mean(Y))**2)
R1 = np.sum((np.dot(X1,Beta1)-np.mean(Y))**2)/np.sum((Y-np.mean(Y))**2)
# 수정된 결정계수
adR0 = 1-(1-R0)*(36-1)/(36-2-1)
adR1 = 1-(1-R1)*(36-1)/(36-3-1.)
## 시각화
import matplotlib.pylab as plt
# 가장먼저 그려보는 점 그래프
plt.scatter(X[:,0],Y)
plt.show()
plt.scatter(X[:,1],Y)
plt.show()
## 시각화 예제
Beta_IN = np.dot(lin.inv(np.dot(X1[:,[0,1>.T,X1[:,[0,1>)),np.dot(X1[:,[0,1>.T,Y))
y = Beta_IN[0] + Beta_IN[1]*X1[:,1]
plt.scatter(X1[:,1],Y)
plt.plot(X1[:,1],y)
plt.show()
plt.scatter(X1[:,1],Y,label='Raw Data')
plt.plot(X1[:,1],y,label='Fitted',color='red')
plt.legend(loc='upper left')
plt.show()
#외국 관관객 데이터를 이용한 시각화
Beta_IN = np.dot(lin.inv(np.dot(X1[:,[0,2>.T,X1[:,[0,2>)),np.dot(X1[:,[0,2>.T,Y))
y = Beta_IN[0] + Beta_IN[1]*X1[:,2]
plt.scatter(X1[:,2],Y,label='Raw Data')
plt.plot(X1[:,2],y,label='Fitted',color='red')
plt.legend(loc='upper left')
plt.show()
## 시각화 심화
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
fig = plt.figure()
ax = Axes3D(fig)
y = Beta1[0] + Beta1[1] * X[:,0] + Beta1[2] * X[:,1]
XX1,XX2 = np.meshgrid(X[:,0],X[:,1])
YY = Beta1[0] + Beta1[1] * XX1 + Beta1[2] * XX2
ax.plot(X[:,0],X[:,1],y,linestyle='none',marker='o',markerfacecolor='blue')
ax.plot_surface(XX1,XX2,YY,rstride=1,cstride=1,cmap='hot')
plt.show()
|