PySpark에서 테이블 데이터를 읽고 쓰는 방법

Pysparkeseo Teibeul Deiteoleul Ilg Go Sseuneun Bangbeob



PySpark에서의 데이터 처리는 데이터가 테이블 형태로 로드되면 더 빨라집니다. 이와 함께 SQl 표현식을 사용하면 처리가 빨라집니다. 따라서 처리를 위해 보내기 전에 PySpark DataFrame/RDD를 테이블로 변환하는 것이 더 나은 접근 방식입니다. 오늘은 테이블 데이터를 PySpark DataFrame으로 읽고, PySpark DataFrame을 테이블에 쓰고, 내장 함수를 사용하여 기존 테이블에 새 DataFrame을 삽입하는 방법을 살펴보겠습니다. 갑시다!

Pyspark.sql.DataFrameWriter.saveAsTable()

먼저 write.saveAsTable() 함수를 사용하여 기존 PySpark DataFrame을 테이블에 쓰는 방법을 살펴보겠습니다. DataFrame을 테이블에 쓰기 위해 테이블 ​​이름과 modes, partionBy 등과 같은 기타 선택적 매개변수를 사용합니다. 쪽모이 세공 파일로 저장됩니다.

통사론:







dataframe_obj.write.saveAsTable(경로/Table_name,mode,partitionBy,…)
  1. Table_name은 dataframe_obj에서 생성된 테이블의 이름입니다.
  2. mode 매개변수를 사용하여 테이블의 데이터를 추가/덮어쓸 수 있습니다.
  3. partitionBy는 단일/다중 열을 사용하여 제공된 이러한 열의 값을 기반으로 파티션을 만듭니다.

예 1:

5개의 행과 4개의 열이 있는 PySpark DataFrame을 만듭니다. 이 데이터 프레임을 'Agri_Table1'이라는 테이블에 씁니다.



파이스파크 가져오기

pyspark.sql에서 SparkSession 가져오기

linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()

# 5개의 행과 5개의 열이 있는 농업 데이터

농업 =[{ '토양_유형' : '검은색' , '관개_가용성' : '아니요' , '에이커스' : 2500 , '토양_상태' : '마른' ,
'국가' : '미국' },

{ '토양_유형' : '검은색' , '관개_가용성' : '예' , '에이커스' : 3500 , '토양_상태' : '젖은' ,
'국가' : '인도' },

{ '토양_유형' : '빨간색' , '관개_가용성' : '예' , '에이커스' : 210 , '토양_상태' : '마른' ,
'국가' : '영국' },

{ '토양_유형' : '다른' , '관개_가용성' : '아니요' , '에이커스' : 1000 , '토양_상태' : '젖은' ,
'국가' : '미국' },

{ '토양_유형' : '모래' , '관개_가용성' : '아니요' , '에이커스' : 500 , '토양_상태' : '마른' ,
'국가' : '인도' }]



# 위 데이터로 데이터프레임 생성

agri_df = linuxhint_spark_app.createDataFrame(농업)

agri_df.show()

# 위의 DataFrame을 테이블에 씁니다.

agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )

산출:







이전 PySpark 데이터로 하나의 parquet 파일이 생성된 것을 볼 수 있습니다.



예 2:

이전 DataFrame을 고려하고 '국가' 열의 값을 기반으로 레코드를 분할하여 테이블에 'Agri_Table2'를 씁니다.

# 위의 DataFrame을 partitionBy 매개변수로 테이블에 씁니다.

agri_df.write.saveAsTable( 'Agri_Table2' ,파티션 기준=[ '국가' ])

산출:

'국가' 열에는 '인도', '영국' 및 '미국'의 세 가지 고유한 값이 있습니다. 따라서 세 개의 파티션이 생성됩니다. 각 파티션에는 쪽모이 세공 파일이 있습니다.

Pyspark.sql.DataFrameReader.table()

spark.read.table() 함수를 사용하여 PySpark DataFrame에 테이블을 로드해 보겠습니다. 경로/테이블 이름인 매개변수 하나만 사용합니다. 테이블을 PySpark DataFrame에 직접 로드하고 PySpark DataFrame에 적용되는 모든 SQL 함수를 이 로드된 DataFrame에도 적용할 수 있습니다.

통사론:

spark_app.read.table(경로/'테이블 이름')

이 시나리오에서는 PySpark DataFrame에서 생성된 이전 테이블을 사용합니다. 환경에서 이전 시나리오 코드 스니펫을 구현해야 하는지 확인하십시오.

예:

'Agri_Table1' 테이블을 'loaded_data'라는 DataFrame에 로드합니다.

loaded_data = linuxhint_spark_app.read.table( 'Agri_Table1' )

loaded_data.show()

산출:

테이블이 PySpark DataFrame에 로드된 것을 볼 수 있습니다.

SQL 쿼리 실행

이제 spark.sql() 함수를 사용하여 로드된 DataFrame에서 일부 SQL 쿼리를 실행합니다.

# SELECT 명령을 사용하여 위 테이블의 모든 열을 표시합니다.

linuxhint_spark_app.sql( 'Agri_Table1에서 * 선택' ).보여주다()

# WHERE 절

linuxhint_spark_app.sql( 'Agri_Table1 WHERE Soil_status='건조'에서 * 선택' ).보여주다()

linuxhint_spark_app.sql( 'Agri_Table1 WHERE 에이커 > 2000에서 * 선택' ).보여주다()

산출:

  1. 첫 번째 쿼리는 DataFrame의 모든 열과 레코드를 표시합니다.
  2. 두 번째 쿼리는 'Soil_status' 열을 기반으로 레코드를 표시합니다. 'Dry' 요소가 있는 레코드는 3개뿐입니다.
  3. 마지막 쿼리는 'Acres'가 2000보다 큰 두 개의 레코드를 반환합니다.

Pyspark.sql.DataFrameWriter.insertInto()

insertInto() 함수를 사용하여 DataFrame을 기존 테이블에 추가할 수 있습니다. 이 함수를 selectExpr()과 함께 사용하여 열 이름을 정의한 다음 테이블에 삽입할 수 있습니다. 이 함수는 또한 tableName을 매개변수로 사용합니다.

통사론:

DataFrame_obj.write.insertInto('테이블 이름')

이 시나리오에서는 PySpark DataFrame에서 생성된 이전 테이블을 사용합니다. 환경에서 이전 시나리오 코드 스니펫을 구현해야 하는지 확인하십시오.

예:

두 개의 레코드가 있는 새 DataFrame을 만들고 'Agri_Table1' 테이블에 삽입합니다.

파이스파크 가져오기

pyspark.sql에서 SparkSession 가져오기

linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()

# 2행의 농업 데이터

농업 =[{ '토양_유형' : '모래' , '관개_가용성' : '아니요' , '에이커스' : 2500 , '토양_상태' : '마른' ,
'국가' : '미국' },

{ '토양_유형' : '모래' , '관개_가용성' : '아니요' , '에이커스' : 1200 , '토양_상태' : '젖은' ,
'국가' : '일본' }]

# 위 데이터로 데이터프레임 생성

agri_df2 = linuxhint_spark_app.createDataFrame(농업)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( '에이커스' , '국가' , '관개_가용성' , '토양_유형' ,
'토양_상태' ).write.insertInto( 'Agri_Table1' )

# 최종 Agri_Table1 표시

linuxhint_spark_app.sql( 'Agri_Table1에서 * 선택' ).보여주다()

산출:

이제 DataFrame에 있는 총 행 수는 7입니다.

결론

이제 write.saveAsTable() 함수를 사용하여 PySpark DataFrame을 테이블에 쓰는 방법을 이해했습니다. 테이블 이름과 기타 선택적 매개변수를 사용합니다. 그런 다음 spark.read.table() 함수를 사용하여 이 테이블을 PySpark DataFrame에 로드했습니다. 경로/테이블 이름인 매개변수 하나만 사용합니다. 새 DataFrame을 기존 테이블에 추가하려면 insertInto() 함수를 사용하십시오.