Pyspark.sql.DataFrameWriter.saveAsTable()
먼저 write.saveAsTable() 함수를 사용하여 기존 PySpark DataFrame을 테이블에 쓰는 방법을 살펴보겠습니다. DataFrame을 테이블에 쓰기 위해 테이블 이름과 modes, partionBy 등과 같은 기타 선택적 매개변수를 사용합니다. 쪽모이 세공 파일로 저장됩니다.
통사론:
dataframe_obj.write.saveAsTable(경로/Table_name,mode,partitionBy,…)
- Table_name은 dataframe_obj에서 생성된 테이블의 이름입니다.
- mode 매개변수를 사용하여 테이블의 데이터를 추가/덮어쓸 수 있습니다.
- partitionBy는 단일/다중 열을 사용하여 제공된 이러한 열의 값을 기반으로 파티션을 만듭니다.
예 1:
5개의 행과 4개의 열이 있는 PySpark DataFrame을 만듭니다. 이 데이터 프레임을 'Agri_Table1'이라는 테이블에 씁니다.
파이스파크 가져오기
pyspark.sql에서 SparkSession 가져오기
linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()
# 5개의 행과 5개의 열이 있는 농업 데이터
농업 =[{ '토양_유형' : '검은색' , '관개_가용성' : '아니요' , '에이커스' : 2500 , '토양_상태' : '마른' ,
'국가' : '미국' },
{ '토양_유형' : '검은색' , '관개_가용성' : '예' , '에이커스' : 3500 , '토양_상태' : '젖은' ,
'국가' : '인도' },
{ '토양_유형' : '빨간색' , '관개_가용성' : '예' , '에이커스' : 210 , '토양_상태' : '마른' ,
'국가' : '영국' },
{ '토양_유형' : '다른' , '관개_가용성' : '아니요' , '에이커스' : 1000 , '토양_상태' : '젖은' ,
'국가' : '미국' },
{ '토양_유형' : '모래' , '관개_가용성' : '아니요' , '에이커스' : 500 , '토양_상태' : '마른' ,
'국가' : '인도' }]
# 위 데이터로 데이터프레임 생성
agri_df = linuxhint_spark_app.createDataFrame(농업)
agri_df.show()
# 위의 DataFrame을 테이블에 씁니다.
agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )
산출:
이전 PySpark 데이터로 하나의 parquet 파일이 생성된 것을 볼 수 있습니다.
예 2:
이전 DataFrame을 고려하고 '국가' 열의 값을 기반으로 레코드를 분할하여 테이블에 'Agri_Table2'를 씁니다.
# 위의 DataFrame을 partitionBy 매개변수로 테이블에 씁니다.agri_df.write.saveAsTable( 'Agri_Table2' ,파티션 기준=[ '국가' ])
산출:
'국가' 열에는 '인도', '영국' 및 '미국'의 세 가지 고유한 값이 있습니다. 따라서 세 개의 파티션이 생성됩니다. 각 파티션에는 쪽모이 세공 파일이 있습니다.
Pyspark.sql.DataFrameReader.table()
spark.read.table() 함수를 사용하여 PySpark DataFrame에 테이블을 로드해 보겠습니다. 경로/테이블 이름인 매개변수 하나만 사용합니다. 테이블을 PySpark DataFrame에 직접 로드하고 PySpark DataFrame에 적용되는 모든 SQL 함수를 이 로드된 DataFrame에도 적용할 수 있습니다.
통사론:
spark_app.read.table(경로/'테이블 이름')이 시나리오에서는 PySpark DataFrame에서 생성된 이전 테이블을 사용합니다. 환경에서 이전 시나리오 코드 스니펫을 구현해야 하는지 확인하십시오.
예:
'Agri_Table1' 테이블을 'loaded_data'라는 DataFrame에 로드합니다.
loaded_data = linuxhint_spark_app.read.table( 'Agri_Table1' )loaded_data.show()
산출:
테이블이 PySpark DataFrame에 로드된 것을 볼 수 있습니다.
SQL 쿼리 실행
이제 spark.sql() 함수를 사용하여 로드된 DataFrame에서 일부 SQL 쿼리를 실행합니다.
# SELECT 명령을 사용하여 위 테이블의 모든 열을 표시합니다.linuxhint_spark_app.sql( 'Agri_Table1에서 * 선택' ).보여주다()
# WHERE 절
linuxhint_spark_app.sql( 'Agri_Table1 WHERE Soil_status='건조'에서 * 선택' ).보여주다()
linuxhint_spark_app.sql( 'Agri_Table1 WHERE 에이커 > 2000에서 * 선택' ).보여주다()
산출:
- 첫 번째 쿼리는 DataFrame의 모든 열과 레코드를 표시합니다.
- 두 번째 쿼리는 'Soil_status' 열을 기반으로 레코드를 표시합니다. 'Dry' 요소가 있는 레코드는 3개뿐입니다.
- 마지막 쿼리는 'Acres'가 2000보다 큰 두 개의 레코드를 반환합니다.
Pyspark.sql.DataFrameWriter.insertInto()
insertInto() 함수를 사용하여 DataFrame을 기존 테이블에 추가할 수 있습니다. 이 함수를 selectExpr()과 함께 사용하여 열 이름을 정의한 다음 테이블에 삽입할 수 있습니다. 이 함수는 또한 tableName을 매개변수로 사용합니다.
통사론:
DataFrame_obj.write.insertInto('테이블 이름')이 시나리오에서는 PySpark DataFrame에서 생성된 이전 테이블을 사용합니다. 환경에서 이전 시나리오 코드 스니펫을 구현해야 하는지 확인하십시오.
예:
두 개의 레코드가 있는 새 DataFrame을 만들고 'Agri_Table1' 테이블에 삽입합니다.
파이스파크 가져오기pyspark.sql에서 SparkSession 가져오기
linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()
# 2행의 농업 데이터
농업 =[{ '토양_유형' : '모래' , '관개_가용성' : '아니요' , '에이커스' : 2500 , '토양_상태' : '마른' ,
'국가' : '미국' },
{ '토양_유형' : '모래' , '관개_가용성' : '아니요' , '에이커스' : 1200 , '토양_상태' : '젖은' ,
'국가' : '일본' }]
# 위 데이터로 데이터프레임 생성
agri_df2 = linuxhint_spark_app.createDataFrame(농업)
agri_df2.show()
# write.insertInto()
agri_df2.selectExpr( '에이커스' , '국가' , '관개_가용성' , '토양_유형' ,
'토양_상태' ).write.insertInto( 'Agri_Table1' )
# 최종 Agri_Table1 표시
linuxhint_spark_app.sql( 'Agri_Table1에서 * 선택' ).보여주다()
산출:
이제 DataFrame에 있는 총 행 수는 7입니다.
결론
이제 write.saveAsTable() 함수를 사용하여 PySpark DataFrame을 테이블에 쓰는 방법을 이해했습니다. 테이블 이름과 기타 선택적 매개변수를 사용합니다. 그런 다음 spark.read.table() 함수를 사용하여 이 테이블을 PySpark DataFrame에 로드했습니다. 경로/테이블 이름인 매개변수 하나만 사용합니다. 새 DataFrame을 기존 테이블에 추가하려면 insertInto() 함수를 사용하십시오.