내용 주제:
- Pandas DataFrame으로 변환하여 PySpark DataFrame을 CSV로 변환
- To_Csv() 메서드를 사용하여 PySpark Pandas DataFrame을 CSV로
- NumPy 배열로 변환하여 PySpark Pandas DataFrame을 CSV로 변환
- Write.Csv() 메서드를 사용하여 PySpark DataFrame을 CSV로
PySpark DataFrame 및 모듈 설치에 대해 알고 싶다면 다음을 참조하십시오. 기사 .
Pandas DataFrame으로 변환하여 PySpark DataFrame을 CSV로 변환
to_csv()는 Pandas DataFrame을 CSV로 변환하는 Pandas 모듈에서 사용할 수 있는 메서드입니다. 먼저 PySpark DataFrame을 Pandas DataFrame으로 변환해야 합니다. 이를 위해 toPandas() 메서드가 사용됩니다. 매개변수와 함께 to_csv() 구문을 살펴보겠습니다.
통사론:
pandas_dataframe_obj.to_csv(경로/ 'file_name.csv' , 머리글 ,색인, 열, 모드...)
- CSV 파일의 파일 이름을 지정해야 합니다. 다운로드한 CSV를 PC의 특정 위치에 저장하려면 파일 이름과 함께 경로를 지정할 수도 있습니다.
- 헤더가 'True'로 설정된 경우 열이 포함됩니다. 열이 필요하지 않은 경우 헤더를 'False'로 설정합니다.
- 인덱스가 'True'로 설정된 경우 인덱스가 지정됩니다. 인덱스가 필요하지 않은 경우 인덱스를 'False'로 설정합니다.
- 열 매개변수는 CSV 파일로 추출되는 특정 열을 지정할 수 있는 열 이름 목록을 사용합니다.
- mode 매개변수를 사용하여 레코드를 CSV에 추가할 수 있습니다. 추가 – 'a'를 사용합니다.
예 1: 헤더 및 인덱스 매개변수 사용
3개의 행과 4개의 열이 있는 'skills_df' PySpark DataFrame을 만듭니다. 이 DataFrame을 먼저 Pandas DataFrame으로 변환하여 CSV로 변환합니다.
파이스파크 가져오기
pyspark.sql에서 SparkSession 가져오기
linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()
3행 4열의 스킬 데이터 #개
스킬 =[{ 'ID' : 123 , '사람' : '꿀' , '기술' : '그림' , '상' : 25000 },
{ 'ID' : 112 , '사람' : '모니' , '기술' : '춤' , '상' : 2000년 },
{ 'ID' : 153 , '사람' : '툴라시' , '기술' : '독서' , '상' : 1200 }
]
# 위의 데이터에서 기술 데이터 프레임을 만듭니다.
Skills_df = linuxhint_spark_app.createDataFrame(기술)
Skills_df.show()
# Skills_df를 pandas DataFrame으로 변환
pandas_skills_df= skills_df.toPandas()
인쇄(pandas_skills_df)
# 이 DataFrame을 헤더와 인덱스가 있는 csv로 변환합니다.
pandas_skills_df.to_csv( 'pandas_skills1.csv' , 머리글 =참, 지수=참)
산출:
PySpark DataFrame이 Pandas DataFrame으로 변환된 것을 볼 수 있습니다. 열 이름과 인덱스가 포함된 CSV로 변환되는지 살펴보겠습니다.
예 2: CSV에 데이터 추가
1개의 레코드가 있는 PySpark DataFrame을 하나 더 만들고 이를 첫 번째 예제의 일부로 생성된 CSV에 추가합니다. 모드 매개변수와 함께 헤더를 'False'로 설정해야 하는지 확인하십시오. 그렇지 않으면 열 이름도 행으로 추가됩니다.
파이스파크 가져오기pyspark.sql에서 SparkSession 가져오기
linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()
스킬 =[{ 'ID' : 90 , '사람' : '바르가브' , '기술' : '독서' , '상' : 12000 }
]
# 위의 데이터에서 기술 데이터 프레임을 만듭니다.
Skills_df = linuxhint_spark_app.createDataFrame(기술)
# Skills_df를 pandas DataFrame으로 변환
pandas_skills_df= skills_df.toPandas()
# 이 DataFrame을 pandas_skills1.csv 파일에 추가
pandas_skills_df.to_csv( 'pandas_skills1.csv' , 모드= 'ㅏ' , 머리글 =거짓)
CSV 출력:
CSV 파일에 새 행이 추가된 것을 볼 수 있습니다.
예 3: Columns 매개변수 사용
동일한 DataFrame을 가지고 'person'과 'prize'라는 두 개의 열이 있는 CSV로 변환해 보겠습니다.
파이스파크 가져오기pyspark.sql에서 SparkSession 가져오기
linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()
3행 4열의 스킬 데이터 #개
스킬 =[{ 'ID' : 123 , '사람' : '꿀' , '기술' : '그림' , '상' : 25000 },
{ 'ID' : 112 , '사람' : '모니' , '기술' : '춤' , '상' : 2000년 },
{ 'ID' : 153 , '사람' : '툴라시' , '기술' : '독서' , '상' : 1200 }
]
# 위의 데이터에서 기술 데이터 프레임을 만듭니다.
Skills_df = linuxhint_spark_app.createDataFrame(기술)
# Skills_df를 pandas DataFrame으로 변환
pandas_skills_df= skills_df.toPandas()
# 이 DataFrame을 특정 열이 있는 csv로 변환
pandas_skills_df.to_csv( 'pandas_skills2.csv' , 열=[ '사람' , '상' ])
CSV 출력:
CSV 파일에는 'person' 및 'prize' 열만 존재하는 것을 볼 수 있습니다.
To_Csv() 메서드를 사용하여 PySpark Pandas DataFrame을 CSV로
to_csv()는 Pandas DataFrame을 CSV로 변환하는 Pandas 모듈에서 사용할 수 있는 메서드입니다. 먼저 PySpark DataFrame을 Pandas DataFrame으로 변환해야 합니다. 이를 위해 toPandas() 메서드가 사용됩니다. 매개변수와 함께 to_csv() 구문을 살펴보겠습니다.
통사론:
pyspark_pandas_dataframe_obj.to_csv(경로/ 'file_name.csv' , 머리글 ,색인,열,...)- CSV 파일의 파일 이름을 지정해야 합니다. 다운로드한 CSV를 PC의 특정 위치에 저장하려면 파일 이름과 함께 경로를 지정할 수도 있습니다.
- 헤더가 'True'로 설정된 경우 열이 포함됩니다. 열이 필요하지 않은 경우 헤더를 'False'로 설정합니다.
- 인덱스가 'True'로 설정된 경우 인덱스가 지정됩니다. 인덱스가 필요하지 않은 경우 인덱스를 'False'로 설정합니다.
- 열 매개변수는 CSV 파일로 추출되는 특정 열을 지정할 수 있는 열 이름 목록을 사용합니다.
예 1: Columns 매개변수 사용
열이 3개인 PySpark Pandas DataFrame을 만들고 'person' 및 'prize' 열과 함께 to_csv()를 사용하여 CSV로 변환합니다.
pyspark 가져오기 팬더에서pyspark_pandas_dataframe=pandas.DataFrame({ 'ID' :[ 90 , 78 , 90 , 57 ], '사람' :[ '꿀' , '모니' , '그 자신' , '라다' ], '상' :[ 1 , 2 , 삼 , 4 ]})
인쇄(pyspark_pandas_dataframe)
# 이 DataFrame을 특정 열이 있는 csv로 변환
pyspark_pandas_dataframe.to_csv( 'pyspark_pandas1' , 열=[ '사람' , '상' ])
산출:
PySpark Pandas DataFrame이 두 개의 파티션이 있는 CSV로 변환되는 것을 볼 수 있습니다. 각 파티션에는 2개의 레코드가 있습니다. 또한 CSV의 열은 '사람'과 '상'만 있습니다.
파티션 파일 1:
파티션 파일 2:
예 2: 헤더 매개변수 사용
이전 DataFrame을 사용하고 'True'로 설정하여 헤더 매개변수를 지정합니다.
pyspark 가져오기 팬더에서pyspark_pandas_dataframe=pandas.DataFrame({ 'ID' :[ 90 , 78 , 90 , 57 ], '사람' :[ '꿀' , '모니' , '그 자신' , '라다' ], '상' :[ 1 , 2 , 삼 , 4 ]})
# 이 DataFrame을 헤더가 있는 csv로 변환합니다.
pyspark_pandas_dataframe.to_csv( 'pyspark_pandas2' , 머리글 =참)
CSV 출력:
PySpark Pandas DataFrame이 두 개의 파티션이 있는 CSV로 변환되는 것을 볼 수 있습니다. 각 파티션에는 열 이름이 있는 2개의 레코드가 있습니다.
파티션 파일 1:
파티션 파일 2:
NumPy 배열로 변환하여 PySpark Pandas DataFrame을 CSV로 변환
Numpy 배열로 변환하여 PySpark Pandas DataFrame을 CSV로 변환하는 옵션이 있습니다. to_numpy()는 PySpark Pandas DataFrame을 NumPy 배열로 변환하는 PySpark Pandas 모듈에서 사용할 수 있는 메서드입니다.
통사론:
pyspark_pandas_dataframe_obj.to_numpy()어떤 매개변수도 사용하지 않습니다.
Tofile() 메서드 사용
NumPy 배열로 변환한 후 tofile() 메서드를 사용하여 NumPy를 CSV로 변환할 수 있습니다. 여기에서 각 레코드를 CSV 파일로 새 셀 열에 저장합니다.
통사론:
array_obj.to_numpy(파일명/경로,sep=' ')파일 이름 또는 CSV의 경로와 구분 기호를 사용합니다.
예:
3개의 열과 4개의 레코드가 있는 PySpark Pandas DataFrame을 생성하고 먼저 NumPy 배열로 변환하여 CSV로 변환합니다.
pyspark 가져오기 팬더에서pyspark_pandas_dataframe=pandas.DataFrame({ 'ID' :[ 90 , 78 , 90 , 57 ], '사람' :[ '꿀' , '모니' , '그 자신' , '라다' ], '상' :[ 1 , 2 , 삼 , 4 ]})
# 위의 DataFrame을 numpy 배열로 변환
변환됨 = pyspark_pandas_dataframe.to_numpy()
인쇄(변환)
# tofile() 사용하기
변환.tofile( 'converted1.csv' , 셉 = ',' )
산출:
[[ 90 '꿀' 1 ][ 78 '모니' 2 ]
[ 90 '그 자신' 삼 ]
[ 57 '라다' 4 ]]
PySpark Pandas DataFrame이 NumPy 배열(12개 값)로 변환되는 것을 볼 수 있습니다. CSV 데이터를 볼 수 있으면 각 셀 값을 새 열에 저장합니다.
Write.Csv() 메서드를 사용하여 PySpark DataFrame을 CSV로
write.csv() 메서드는 CSV 파일을 매개 변수로 저장해야 하는 파일 이름/경로를 사용합니다.
통사론:
dataframe_object.coalesce( 1 ).write.csv( '파일 이름' )실제로 CSV는 파티션(하나 이상)으로 저장됩니다. 이를 없애기 위해 분할된 모든 CSV 파일을 하나로 병합합니다. 이 시나리오에서는 coalesce() 함수를 사용합니다. 이제 PySpark DataFrame의 모든 행이 포함된 하나의 CSV 파일만 볼 수 있습니다.
예:
4개의 열이 있는 4개의 레코드가 있는 PySpark DataFrame을 고려하십시오. 이 DataFrame을 'market_details'라는 파일을 사용하여 CSV에 작성합니다.
파이스파크 가져오기pyspark.sql에서 SparkSession 가져오기
linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()
# 4개의 행과 4개의 열이 있는 시장 데이터
시장 =[{ 'm_id' : 'mz-001' , 'm_이름' : '알파벳' , 'm_city' : '델리' , 'm_state' : '델리' },
{ 'm_id' : 'mz-002' , 'm_이름' : 'XYZ' , 'm_city' : '파트나' , 'm_state' : '럭나우' },
{ 'm_id' : 'mz-003' , 'm_이름' : 'PQR' , 'm_city' : '플로리다' , 'm_state' : '하나' },
{ 'm_id' : 'mz-004' , 'm_이름' : '알파벳' , 'm_city' : '델리' , 'm_state' : '럭나우' }
]
# 위의 데이터에서 시장 데이터 프레임을 만듭니다.
market_df = linuxhint_spark_app.createDataFrame(시장)
# 실제 시장 데이터
market_df.show()
# 쓰기.csv()
market_df.coalesce( 1 ).write.csv( 'market_details' )
산출:
파일을 확인해 봅시다:
기록을 보려면 마지막 파일을 엽니다.
결론
서로 다른 매개변수를 고려하여 예제를 통해 PySpark DataFrame을 CSV로 변환하는 네 가지 시나리오를 배웠습니다. PySpark DataFrame으로 작업할 때 이 DataFrame을 CSV로 변환하는 두 가지 옵션이 있습니다. 하나는 write() 메서드를 사용하는 것이고 다른 하나는 Pandas DataFrame으로 변환하여 to_csv() 메서드를 사용하는 것입니다. PySpark Pandas DataFrame으로 작업하는 경우 NumPy 배열로 변환하여 to_csv() 및 tofile()을 활용할 수도 있습니다.