내용

글번호 234
작성자 heojk
작성일 2016-12-15 14:28:59
제목 pyspark 실습코드(전체)
내용 # spark-env.sh 에 삽입할 문장 export SPARK_DIST_CLASSPATH=$(hadoop classpath) # 데이터프레임형식 df = sqlContext.read.json("file:///home/eduuser/spark/ examples/src/main/resources/people.json") df.show() # 데이터프레임 조작 df.printSchema() df.select("name").show() df.select(df['name'], df['age'] + 1).show() df.filter(df['age'] > 21).show() df.groupBy("age").count().show() ## 리플렉션 이용 from pyspark.sql import Row # 텍스트 파일을 불러와 각 열을 전환 lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) # 스키마를 추정하고 데이터 프레임을 테이블로 등록 schemaPeople = sqlContext.createDataFrame(people) schemaPeople.registerTempTable("people") # 테이블로 등록이 된 데이터프레임에 대해 SQL 문장실행이 가능 teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # SQL쿼리의 결과는 RDDs 이고 일반적인 RDD작업을 모두 지원 teenNames = teenagers.map(lambda p: "Name: " + p.name) for teenName in teenNames.collect(): print(teenName) ## 프로그래밍 사용 from pyspark.sql.types import * # 텍스트 파일을 불러와 튜플 각 라인을 튜플 형식으로 변환 lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: (p[0], p[1].strip())) # 스키마는 문자열로 인코딩 schemaString = "name age" fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] schema = StructType(fields) # 스키마를 RDD에 적용 schemaPeople = sqlContext.createDataFrame(people, schema) # 데이터프레임을 테이블로 등록 schemaPeople.registerTempTable("people") # 테이블로 등록된 데이터프레임에는 SQL문장 실행이 가능 results = sqlContext.sql("SELECT name FROM people") # SQL문장의 결과는 기본 RDD연산이 모두 수행되는 RDDs로 주어진다 names = results.map(lambda p: "Name: " + p.name) for name in names.collect(): print(name) ## 로드/저장 함수 df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet") ## 수동 조작 df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/people.json", format="json") df.select("name", "age").write.save("namesAndAges.parquet", format="parquet") ###### 데이터 조작 실습 ## 리플렉션 이용 from pyspark.sql import Row lines = sc.textFile('jeju_2010.csv') parts = lines.map(lambda l: l.split(',')) jeju2010 = parts.map(lambda p: Row(IN=p[0], OUT=p[1], INCOME=p[2])) schema2010 = sqlContext.createDataFrame(jeju2010) schema2010.registerTempTable('jeju2010') schema2010.show() ## 수정 sqlContext.sql("select * from jeju2010 where INCOME!='INCOME'").show() jeju2010 = sqlContext.sql("select * from jeju2010 where INCOME!='INCOME'") ## 프로그래밍 방법 from pyspark.sql.types import * lines = sc.textFile('jeju_2011.csv') parts = lines.map(lambda l:l.split(',')) jeju2011 = parts.map(lambda p:(p[0].strip(),p[1].strip(),p[2].strip())) schemaString = "IN OUT INCOME" fields=[StructField(field_name,StringType(),True) for field_name in schemaString.split()] schema2011 = sqlContext.createDataFrame(jeju2011,schema) schema2011.registerTempTable('jeju2011') ###### 데이터 분석 실습 ## 데이터 불러오기 from pyspark.sql import Row ? # RDD 생성 lines = sc.textFile('jeju_2010.csv') ? # 생성된 RDD를 콤마 구분자로 구분 parts = lines.map(lambda l: l.split(',')) ? # 앞의 RDD의 첫번째 두번째 세번째 column의 데이터 지정 jeju2010 = parts.map(lambda p: Row(IN=p[0], OUT=p[1], INCOME=p[2])) ? # 데이터 프레임 생성 schema2010 = sqlContext.createDataFrame(jeju2010) ? # 데이터 프레임을 테이블로 등록 schema2010.registerTempTable('jeju2010') ? # 눈으로 확인 schema2010.show() ## 데이터 수정 jeju2010 = sqlContext.sql("select * from jeju2010 where INCOME != 'INCOME'") jeju2011 = sqlContext.sql("select * from jeju2011 where INCOME != 'INCOME'") jeju2012 = sqlContext.sql("select * from jeju2012 where INCOME != 'INCOME'") ## 데이터 합치기 # 임시로 year 라는 새로운 변수를 포함하는 RDD 생성 tmp = jeju2010.map(lambda p:Row(YEAR=2010,IN=p[0],OUT=p[2],INCOME=p[1])) ? # 생성된 RDD를 기반으로 데이터 프레임 만들기 jeju2010 = sqlContext.createDataFrame(tmp) # 2010년부터 2012년 까지 모두 실행 ? # 데이터 합치기 jeju = jeju2010.unionAll(jeju2011).unionAll(jeju2012) ## 데이터 형식 변환 # RDD 생성 tmp = jeju.map(lambda p:Row( IN=int(p[0]), OUT=int(p[1]), INCOME=int(p[2]),YEAR=int(p[3]) ) ) # RDD로부터 데이터프레임 형성 jeju2 = sqlContext.createDataFrame(tmp) ## 기술 통계 jeju2.describe().show() ## 그룹화 jeju2.groupBy('YEAR').avg().show() jeju2.select('IN','INCOME','YEAR').groupBy('YEAR').mean().show() jeju2['IN','OUT','YEAR'].groupBy('YEAR').mean().show() ## 상관계수 jeju2.corr('IN','OUT') jeju2.corr('IN','INCOME') jeju2.corr('OUT','INCOME') ## 회귀분석 준비 import numpy as np import numpy.linalg as lin # X 매트릭스 만들기 X = np.array(jeju2.select('IN','OUT').collect()) # Y 벡터 만들기 Y = np.array(jeju2.select('INCOME').collect()) ## 회귀모형 적합 # 상수가 없는 모형 Beta0 = np.dot(lin.inv(np.dot(X.T,X)),np.dot(X.T,Y)) # 상수가 있는 모형 X1 = np.hstack([np.array([np.ones(36)]).T,X]) Beta1 = np.dot(lin.inv(np.dot(X1.T,X1)),np.dot(X1.T,Y)) ## 결정계수 # 결정계수 R0 = np.sum((np.dot(X,Beta0)-np.mean(Y))**2)/np.sum((Y-np.mean(Y))**2) R1 = np.sum((np.dot(X1,Beta1)-np.mean(Y))**2)/np.sum((Y-np.mean(Y))**2) # 수정된 결정계수 adR0 = 1-(1-R0)*(36-1)/(36-2-1) adR1 = 1-(1-R1)*(36-1)/(36-3-1.) ## 시각화 import matplotlib.pylab as plt # 가장먼저 그려보는 점 그래프 plt.scatter(X[:,0],Y) plt.show() plt.scatter(X[:,1],Y) plt.show() ## 시각화 예제 Beta_IN = np.dot(lin.inv(np.dot(X1[:,[0,1>.T,X1[:,[0,1>)),np.dot(X1[:,[0,1>.T,Y)) y = Beta_IN[0] + Beta_IN[1]*X1[:,1] plt.scatter(X1[:,1],Y) plt.plot(X1[:,1],y) plt.show() plt.scatter(X1[:,1],Y,label='Raw Data') plt.plot(X1[:,1],y,label='Fitted',color='red') plt.legend(loc='upper left') plt.show() #외국 관관객 데이터를 이용한 시각화 Beta_IN = np.dot(lin.inv(np.dot(X1[:,[0,2>.T,X1[:,[0,2>)),np.dot(X1[:,[0,2>.T,Y)) y = Beta_IN[0] + Beta_IN[1]*X1[:,2] plt.scatter(X1[:,2],Y,label='Raw Data') plt.plot(X1[:,2],y,label='Fitted',color='red') plt.legend(loc='upper left') plt.show() ## 시각화 심화 import numpy as np import matplotlib.pyplot as plt from mpl_toolkits.mplot3d import Axes3D fig = plt.figure() ax = Axes3D(fig) y = Beta1[0] + Beta1[1] * X[:,0] + Beta1[2] * X[:,1] XX1,XX2 = np.meshgrid(X[:,0],X[:,1]) YY = Beta1[0] + Beta1[1] * XX1 + Beta1[2] * XX2 ax.plot(X[:,0],X[:,1],y,linestyle='none',marker='o',markerfacecolor='blue') ax.plot_surface(XX1,XX2,YY,rstride=1,cstride=1,cmap='hot') plt.show()
첨부파일 pyspark_src.txt (7,034byte)