PySpark DataFrame을 CSV로 변환

Pyspark Dataframeeul Csvlo Byeonhwan



PySpark DataFrame을 CSV로 변환하는 네 가지 시나리오를 살펴보겠습니다. 직접 write.csv() 메서드를 사용하여 PySpark DataFrame을 CSV로 변환합니다. to_csv() 함수를 사용하여 PySpark Pandas DataFrame을 CSV로 변환합니다. NumPy 배열로 변환하여도 가능합니다.

내용 주제:

PySpark DataFrame 및 모듈 설치에 대해 알고 싶다면 다음을 참조하십시오. 기사 .







Pandas DataFrame으로 변환하여 PySpark DataFrame을 CSV로 변환

to_csv()는 Pandas DataFrame을 CSV로 변환하는 Pandas 모듈에서 사용할 수 있는 메서드입니다. 먼저 PySpark DataFrame을 Pandas DataFrame으로 변환해야 합니다. 이를 위해 toPandas() 메서드가 사용됩니다. 매개변수와 함께 to_csv() 구문을 살펴보겠습니다.



통사론:



pandas_dataframe_obj.to_csv(경로/ 'file_name.csv' , 머리글 ,색인, 열, 모드...)
  1. CSV 파일의 파일 이름을 지정해야 합니다. 다운로드한 CSV를 PC의 특정 위치에 저장하려면 파일 이름과 함께 경로를 지정할 수도 있습니다.
  2. 헤더가 'True'로 설정된 경우 열이 포함됩니다. 열이 필요하지 않은 경우 헤더를 'False'로 설정합니다.
  3. 인덱스가 'True'로 설정된 경우 인덱스가 지정됩니다. 인덱스가 필요하지 않은 경우 인덱스를 'False'로 설정합니다.
  4. 열 매개변수는 CSV 파일로 추출되는 특정 열을 지정할 수 있는 열 이름 목록을 사용합니다.
  5. mode 매개변수를 사용하여 레코드를 CSV에 추가할 수 있습니다. 추가 – 'a'를 사용합니다.

예 1: 헤더 및 인덱스 매개변수 사용

3개의 행과 4개의 열이 있는 'skills_df' PySpark DataFrame을 만듭니다. 이 DataFrame을 먼저 Pandas DataFrame으로 변환하여 CSV로 변환합니다.





파이스파크 가져오기

pyspark.sql에서 SparkSession 가져오기

linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()

3행 4열의 스킬 데이터 #개

스킬 =[{ 'ID' : 123 , '사람' : '꿀' , '기술' : '그림' , '상' : 25000 },

{ 'ID' : 112 , '사람' : '모니' , '기술' : '춤' , '상' : 2000년 },

{ 'ID' : 153 , '사람' : '툴라시' , '기술' : '독서' , '상' : 1200 }

]

# 위의 데이터에서 기술 데이터 프레임을 만듭니다.

Skills_df = linuxhint_spark_app.createDataFrame(기술)

Skills_df.show()

# Skills_df를 pandas DataFrame으로 변환

pandas_skills_df= skills_df.toPandas()

인쇄(pandas_skills_df)

# 이 DataFrame을 헤더와 인덱스가 있는 csv로 변환합니다.

pandas_skills_df.to_csv( 'pandas_skills1.csv' , 머리글 =참, 지수=참)

산출:



PySpark DataFrame이 Pandas DataFrame으로 변환된 것을 볼 수 있습니다. 열 이름과 인덱스가 포함된 CSV로 변환되는지 살펴보겠습니다.

예 2: CSV에 데이터 추가

1개의 레코드가 있는 PySpark DataFrame을 하나 더 만들고 이를 첫 번째 예제의 일부로 생성된 CSV에 추가합니다. 모드 매개변수와 함께 헤더를 'False'로 설정해야 하는지 확인하십시오. 그렇지 않으면 열 이름도 행으로 추가됩니다.

파이스파크 가져오기

pyspark.sql에서 SparkSession 가져오기

linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()

스킬 =[{ 'ID' : 90 , '사람' : '바르가브' , '기술' : '독서' , '상' : 12000 }

]

# 위의 데이터에서 기술 데이터 프레임을 만듭니다.

Skills_df = linuxhint_spark_app.createDataFrame(기술)

# Skills_df를 pandas DataFrame으로 변환

pandas_skills_df= skills_df.toPandas()

# 이 DataFrame을 pandas_skills1.csv 파일에 추가

pandas_skills_df.to_csv( 'pandas_skills1.csv' , 모드= 'ㅏ' , 머리글 =거짓)

CSV 출력:

CSV 파일에 새 행이 추가된 것을 볼 수 있습니다.

예 3: Columns 매개변수 사용

동일한 DataFrame을 가지고 'person'과 'prize'라는 두 개의 열이 있는 CSV로 변환해 보겠습니다.

파이스파크 가져오기

pyspark.sql에서 SparkSession 가져오기

linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()

3행 4열의 스킬 데이터 #개

스킬 =[{ 'ID' : 123 , '사람' : '꿀' , '기술' : '그림' , '상' : 25000 },

{ 'ID' : 112 , '사람' : '모니' , '기술' : '춤' , '상' : 2000년 },

{ 'ID' : 153 , '사람' : '툴라시' , '기술' : '독서' , '상' : 1200 }

]

# 위의 데이터에서 기술 데이터 프레임을 만듭니다.

Skills_df = linuxhint_spark_app.createDataFrame(기술)

# Skills_df를 pandas DataFrame으로 변환

pandas_skills_df= skills_df.toPandas()

# 이 DataFrame을 특정 열이 있는 csv로 변환

pandas_skills_df.to_csv( 'pandas_skills2.csv' , 열=[ '사람' , '상' ])

CSV 출력:

CSV 파일에는 'person' 및 'prize' 열만 존재하는 것을 볼 수 있습니다.

To_Csv() 메서드를 사용하여 PySpark Pandas DataFrame을 CSV로

to_csv()는 Pandas DataFrame을 CSV로 변환하는 Pandas 모듈에서 사용할 수 있는 메서드입니다. 먼저 PySpark DataFrame을 Pandas DataFrame으로 변환해야 합니다. 이를 위해 toPandas() 메서드가 사용됩니다. 매개변수와 함께 to_csv() 구문을 살펴보겠습니다.

통사론:

pyspark_pandas_dataframe_obj.to_csv(경로/ 'file_name.csv' , 머리글 ,색인,열,...)
  1. CSV 파일의 파일 이름을 지정해야 합니다. 다운로드한 CSV를 PC의 특정 위치에 저장하려면 파일 이름과 함께 경로를 지정할 수도 있습니다.
  2. 헤더가 'True'로 설정된 경우 열이 포함됩니다. 열이 필요하지 않은 경우 헤더를 'False'로 설정합니다.
  3. 인덱스가 'True'로 설정된 경우 인덱스가 지정됩니다. 인덱스가 필요하지 않은 경우 인덱스를 'False'로 설정합니다.
  4. 열 매개변수는 CSV 파일로 추출되는 특정 열을 지정할 수 있는 열 이름 목록을 사용합니다.

예 1: Columns 매개변수 사용

열이 3개인 PySpark Pandas DataFrame을 만들고 'person' 및 'prize' 열과 함께 to_csv()를 사용하여 CSV로 변환합니다.

pyspark 가져오기 팬더에서

pyspark_pandas_dataframe=pandas.DataFrame({ 'ID' :[ 90 , 78 , 90 , 57 ], '사람' :[ '꿀' , '모니' , '그 자신' , '라다' ], '상' :[ 1 , 2 , , 4 ]})

인쇄(pyspark_pandas_dataframe)

# 이 DataFrame을 특정 열이 있는 csv로 변환

pyspark_pandas_dataframe.to_csv( 'pyspark_pandas1' , 열=[ '사람' , '상' ])

산출:

PySpark Pandas DataFrame이 두 개의 파티션이 있는 CSV로 변환되는 것을 볼 수 있습니다. 각 파티션에는 2개의 레코드가 있습니다. 또한 CSV의 열은 '사람'과 '상'만 있습니다.

파티션 파일 1:

파티션 파일 2:

예 2: 헤더 매개변수 사용

이전 DataFrame을 사용하고 'True'로 설정하여 헤더 매개변수를 지정합니다.

pyspark 가져오기 팬더에서

pyspark_pandas_dataframe=pandas.DataFrame({ 'ID' :[ 90 , 78 , 90 , 57 ], '사람' :[ '꿀' , '모니' , '그 자신' , '라다' ], '상' :[ 1 , 2 , , 4 ]})

# 이 DataFrame을 헤더가 있는 csv로 변환합니다.

pyspark_pandas_dataframe.to_csv( 'pyspark_pandas2' , 머리글 =참)

CSV 출력:

PySpark Pandas DataFrame이 두 개의 파티션이 있는 CSV로 변환되는 것을 볼 수 있습니다. 각 파티션에는 열 이름이 있는 2개의 레코드가 있습니다.

파티션 파일 1:

파티션 파일 2:

NumPy 배열로 변환하여 PySpark Pandas DataFrame을 CSV로 변환

Numpy 배열로 변환하여 PySpark Pandas DataFrame을 CSV로 변환하는 옵션이 있습니다. to_numpy()는 PySpark Pandas DataFrame을 NumPy 배열로 변환하는 PySpark Pandas 모듈에서 사용할 수 있는 메서드입니다.

통사론:

pyspark_pandas_dataframe_obj.to_numpy()

어떤 매개변수도 사용하지 않습니다.

Tofile() 메서드 사용

NumPy 배열로 변환한 후 tofile() 메서드를 사용하여 NumPy를 CSV로 변환할 수 있습니다. 여기에서 각 레코드를 CSV 파일로 새 셀 열에 저장합니다.

통사론:

array_obj.to_numpy(파일명/경로,sep=' ')

파일 이름 또는 CSV의 경로와 구분 기호를 사용합니다.

예:

3개의 열과 4개의 레코드가 있는 PySpark Pandas DataFrame을 생성하고 먼저 NumPy 배열로 변환하여 CSV로 변환합니다.

pyspark 가져오기 팬더에서

pyspark_pandas_dataframe=pandas.DataFrame({ 'ID' :[ 90 , 78 , 90 , 57 ], '사람' :[ '꿀' , '모니' , '그 자신' , '라다' ], '상' :[ 1 , 2 , , 4 ]})

# 위의 DataFrame을 numpy 배열로 변환

변환됨 = pyspark_pandas_dataframe.to_numpy()

인쇄(변환)

# tofile() 사용하기

변환.tofile( 'converted1.csv' , 셉 = ',' )

산출:

[[ 90 '꿀' 1 ]

[ 78 '모니' 2 ]

[ 90 '그 자신' ]

[ 57 '라다' 4 ]]

PySpark Pandas DataFrame이 NumPy 배열(12개 값)로 변환되는 것을 볼 수 있습니다. CSV 데이터를 볼 수 있으면 각 셀 값을 새 열에 저장합니다.

Write.Csv() 메서드를 사용하여 PySpark DataFrame을 CSV로

write.csv() 메서드는 CSV 파일을 매개 변수로 저장해야 하는 파일 이름/경로를 사용합니다.

통사론:

dataframe_object.coalesce( 1 ).write.csv( '파일 이름' )

실제로 CSV는 파티션(하나 이상)으로 저장됩니다. 이를 없애기 위해 분할된 모든 CSV 파일을 하나로 병합합니다. 이 시나리오에서는 coalesce() 함수를 사용합니다. 이제 PySpark DataFrame의 모든 행이 포함된 하나의 CSV 파일만 볼 수 있습니다.

예:

4개의 열이 있는 4개의 레코드가 있는 PySpark DataFrame을 고려하십시오. 이 DataFrame을 'market_details'라는 파일을 사용하여 CSV에 작성합니다.

파이스파크 가져오기

pyspark.sql에서 SparkSession 가져오기

linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()

# 4개의 행과 4개의 열이 있는 시장 데이터

시장 =[{ 'm_id' : 'mz-001' , 'm_이름' : '알파벳' , 'm_city' : '델리' , 'm_state' : '델리' },

{ 'm_id' : 'mz-002' , 'm_이름' : 'XYZ' , 'm_city' : '파트나' , 'm_state' : '럭나우' },

{ 'm_id' : 'mz-003' , 'm_이름' : 'PQR' , 'm_city' : '플로리다' , 'm_state' : '하나' },

{ 'm_id' : 'mz-004' , 'm_이름' : '알파벳' , 'm_city' : '델리' , 'm_state' : '럭나우' }

]



# 위의 데이터에서 시장 데이터 프레임을 만듭니다.

market_df = linuxhint_spark_app.createDataFrame(시장)

# 실제 시장 데이터

market_df.show()

# 쓰기.csv()

market_df.coalesce( 1 ).write.csv( 'market_details' )

산출:

파일을 확인해 봅시다:

기록을 보려면 마지막 파일을 엽니다.

결론

서로 다른 매개변수를 고려하여 예제를 통해 PySpark DataFrame을 CSV로 변환하는 네 가지 시나리오를 배웠습니다. PySpark DataFrame으로 작업할 때 이 DataFrame을 CSV로 변환하는 두 가지 옵션이 있습니다. 하나는 write() 메서드를 사용하는 것이고 다른 하나는 Pandas DataFrame으로 변환하여 to_csv() 메서드를 사용하는 것입니다. PySpark Pandas DataFrame으로 작업하는 경우 NumPy 배열로 변환하여 to_csv() 및 tofile()을 활용할 수도 있습니다.