내용 주제:
- ToPandas()와 함께 To_json()을 사용하여 PySpark DataFrame에서 JSON으로
- ToJSON()을 사용하여 PySpark DataFrame에서 JSON으로
- Write.json()을 사용하여 PySpark DataFrame에서 JSON으로
모든 예제에서 간단한 PySpark DataFrame을 고려하고 언급된 함수를 사용하여 JSON으로 변환해 봅시다.
필요한 모듈:
아직 설치되지 않은 경우 환경에 PySpark 라이브러리를 설치합니다. 다음 명령을 참조하여 설치할 수 있습니다.
핍 설치 pyspark
ToPandas()와 함께 To_json()을 사용하여 PySpark DataFrame에서 JSON으로
to_json() 메서드는 Pandas DataFrame을 JSON으로 변환하는 Pandas 모듈에서 사용할 수 있습니다. PySpark DataFrame을 Pandas DataFrame으로 변환하면 이 방법을 활용할 수 있습니다. PySpark DataFrame을 Pandas DataFrame으로 변환하기 위해 toPandas() 메서드를 사용합니다. 매개변수와 함께 to_json() 구문을 살펴보겠습니다.
통사론:
dataframe_object.toPandas().to_json(orient,index,...)
- Orient는 변환된 JSON을 원하는 형식으로 표시하는 데 사용됩니다. '레코드', '테이블', '값', '열', '인덱스', '분할'이 필요합니다.
- 인덱스는 변환된 JSON 문자열에서 인덱스를 포함/제거하는 데 사용됩니다. 'True'로 설정하면 인덱스가 표시됩니다. 그렇지 않으면 방향이 '분할' 또는 '테이블'인 경우 인덱스가 표시되지 않습니다.
예 1: '레코드'로 방향 지정
3개의 행과 4개의 열이 있는 'skills_df' PySpark DataFrame을 만듭니다. orient 매개변수를 'records'로 지정하여 이 DataFrame을 JSON으로 변환합니다.
파이스파크 가져오기수입 판다
pyspark.sql에서 SparkSession 가져오기
linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()
3행 4열의 스킬 데이터 #개
스킬 =[{ 'ID' : 123 , '사람' : '꿀' , '기술' : '그림' , '상' : 25000 },
{ 'ID' : 112 , '사람' : '모니' , '기술' : '춤' , '상' : 2000년 },
{ 'ID' : 153 , '사람' : '툴라시' , '기술' : '독서' , '상' : 1200 }
]
# 위의 데이터에서 기술 데이터 프레임을 만듭니다.
Skills_df = linuxhint_spark_app.createDataFrame(기술)
# 실제 스킬 데이터
Skills_df.show()
# orient를 '레코드'로 사용하여 to_json()을 사용하여 JSON으로 변환
json_skills_data = Skills_df.toPandas().to_json(orient= '기록' )
인쇄(json_skills_data)
산출:
+---+------+-----+--------+
| 아이디|사람|상품| 스킬|
+---+------+-----+--------+
| 123 | 꿀| 25000 |그림|
| 112 | 무니| 2000년 | 댄스|
| 153 |툴라시| 1200 | 독서|
+---+------+-----+--------+
[{ 'ID' : 123 , '사람' : '꿀' , '상' : 25000 , '기술' : '그림' },{ 'ID' : 112 , '사람' : '무니' , '상' : 2000년 , '기술' : '춤' },{ 'ID' : 153 , '사람' : '툴라시' , '상' : 1200 , '기술' : '독서' }]
PySpark DataFrame이 값 사전이 있는 JSON 배열로 변환되는 것을 볼 수 있습니다. 여기서 키는 열 이름을 나타내고 값은 PySpark DataFrame의 행/셀 값을 나타냅니다.
예 2: '분할'로 방향 지정
'분할' 방향에 의해 반환되는 JSON 형식에는 열 목록, 인덱스 목록 및 데이터 목록이 있는 열 이름이 포함됩니다. 다음은 '분할' 오리엔트의 형식입니다.
# orient를 'split'으로 사용하여 to_json()을 사용하여 JSON으로 변환
json_skills_data = Skills_df.toPandas().to_json(orient= '나뉘다' )
인쇄(json_skills_data)
산출:
{ '열' :[ 'ID' , '사람' , '상' , '기술' ], '색인' :[ 0 , 1 , 2 ], '데이터' :[[ 123 , '꿀' , 25000 , '그림' ],[ 112 , '무니' , 2000년 , '춤' ],[ 153 , '툴라시' , 1200 , '독서' ]]}예 3: '인덱스'로 방향 지정
여기에서 PySpark DataFrame의 각 행은 키가 열 이름인 사전 형식으로 폐기됩니다. 각 사전에 대해 인덱스 위치가 키로 지정됩니다.
# orient를 'index'로 지정하여 to_json()을 사용하여 JSON으로 변환
json_skills_data = Skills_df.toPandas().to_json(orient= '색인' )
인쇄(json_skills_data)
산출:
{ '0' :{ 'ID' : 123 , '사람' : '꿀' , '상' : 25000 , '기술' : '그림' }, '1' :{ 'ID' : 112 , '사람' : '무니' , '상' : 2000년 , '기술' : '춤' }, '2' :{ 'ID' : 153 , '사람' : '툴라시' , '상' : 1200 , '기술' : '독서' }}예 4: '열'로 방향 지정
열은 각 레코드의 키입니다. 각 열에는 색인 번호가 있는 열 값을 가져오는 사전이 있습니다.
# orient as 'columns'와 함께 to_json()을 사용하여 JSON으로 변환
json_skills_data = Skills_df.toPandas().to_json(orient= '열' )
인쇄(json_skills_data)
산출:
{ 'ID' :{ '0' : 123 , '1' : 112 , '2' : 153 }, '사람' :{ '0' : '꿀' , '1' : '무니' , '2' : '툴라시' }, '상' :{ '0' : 25000 , '1' : 2000년 , '2' : 1200 }, '기술' :{ '0' : '그림' , '1' : '춤' , '2' : '독서' }}예 5: '값'으로 방향 지정
JSON의 값만 필요한 경우 '값' 방향으로 이동할 수 있습니다. 목록의 각 행을 표시합니다. 마지막으로 모든 목록이 목록에 저장됩니다. 이 JSON은 중첩 목록 유형입니다.
# orient를 'values'로 사용하여 to_json()을 사용하여 JSON으로 변환
json_skills_data = Skills_df.toPandas().to_json(orient= '가치' )
인쇄(json_skills_data)
산출:
[[ 123 , '꿀' , 25000 , '그림' ],[ 112 , '무니' , 2000년 , '춤' ],[ 153 , '툴라시' , 1200 , '독서' ]]예 6: '테이블'로 방향 지정
'테이블' 방향은 열 데이터 유형, 기본 키로서의 인덱스 및 Pandas 버전과 함께 필드 이름이 있는 스키마를 포함하는 JSON을 반환합니다. 값이 있는 열 이름은 '데이터'로 표시됩니다.
# orient as 'table'과 함께 to_json()을 사용하여 JSON으로 변환
json_skills_data = Skills_df.toPandas().to_json(orient= '테이블' )
인쇄(json_skills_data)
산출:
{ '개요' :{ '필드' :[{ '이름' : '색인' , '유형' : '정수' },{ '이름' : 'ID' , '유형' : '정수' },{ '이름' : '사람' , '유형' : '끈' },{ '이름' : '상' , '유형' : '정수' },{ '이름' : '기술' , '유형' : '끈' }], '기본 키' :[ '색인' ], 'pandas_version' : '1.4.0' }, '데이터' :[{ '색인' : 0 , 'ID' : 123 , '사람' : '꿀' , '상' : 25000 , '기술' : '그림' },{ '색인' : 1 , 'ID' : 112 , '사람' : '무니' , '상' : 2000년 , '기술' : '춤' },{ '색인' : 2 , 'ID' : 153 , '사람' : '툴라시' , '상' : 1200 , '기술' : '독서' }]}예 7: 인덱스 매개변수 사용
먼저 인덱스 매개변수를 'True'로 설정하여 전달합니다. 인덱스 위치가 사전의 키로 반환되는 각 열 값에 대해 볼 수 있습니다.
두 번째 출력에서는 인덱스가 'False'로 설정되어 있으므로 인덱스 위치 없이 열 이름('columns')과 레코드('data')만 반환됩니다.
# index=True와 함께 to_json()을 사용하여 JSON으로 변환json_skills_data = Skills_df.toPandas().to_json(색인=참)
인쇄(json_skills_data, ' \N ' )
# index=False로 to_json()을 사용하여 JSON으로 변환
json_skills_data= Skills_df.toPandas().to_json(인덱스=False,orient= '나뉘다' )
인쇄(json_skills_data)
산출:
{ 'ID' :{ '0' : 123 , '1' : 112 , '2' : 153 }, '사람' :{ '0' : '꿀' , '1' : '무니' , '2' : '툴라시' }, '상' :{ '0' : 25000 , '1' : 2000년 , '2' : 1200 }, '기술' :{ '0' : '그림' , '1' : '춤' , '2' : '독서' }}{ '열' :[ 'ID' , '사람' , '상' , '기술' ], '데이터' :[[ 123 , '꿀' , 25000 , '그림' ],[ 112 , '무니' , 2000년 , '춤' ],[ 153 , '툴라시' , 1200 , '독서' ]]
ToJSON()을 사용하여 PySpark DataFrame에서 JSON으로
toJSON() 메서드는 PySpark DataFrame을 JSON 개체로 변환하는 데 사용됩니다. 기본적으로 목록으로 둘러싸인 JSON 문자열을 반환합니다. 그만큼 ['{열:값,...}',... ] 이 함수가 반환하는 형식입니다. 여기에서 PySpark DataFrame의 각 행은 열 이름을 키로 하는 사전으로 반환됩니다.
통사론:
dataframe_object.toJSON()인덱스, 열 레이블 및 데이터 유형과 같은 매개변수를 전달할 수 있습니다.
예:
5개의 행과 4개의 열이 있는 'skills_df' PySpark DataFrame을 만듭니다. toJSON() 메서드를 사용하여 이 DataFrame을 JSON으로 변환합니다.
파이스파크 가져오기pyspark.sql에서 SparkSession 가져오기
linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()
5행 4열의 기술 데이터 #개
스킬 =[{ 'ID' : 123 , '사람' : '꿀' , '기술' : '그림' , '상' : 25000 },
{ 'ID' : 112 , '사람' : '모니' , '기술' : '음악/춤' , '상' : 2000년 },
{ 'ID' : 153 , '사람' : '툴라시' , '기술' : '독서' , '상' : 1200 },
{ 'ID' : 173 , '사람' : '란' , '기술' : '음악' , '상' : 2000년 },
{ 'ID' : 43 , '사람' : '카말라' , '기술' : '독서' , '상' : 10000 }
]
# 위의 데이터에서 기술 데이터 프레임을 만듭니다.
Skills_df = linuxhint_spark_app.createDataFrame(기술)
# 실제 스킬 데이터
Skills_df.show()
# JSON 배열로 변환
json_skills_data = Skills_df.toJSON().collect()
인쇄(json_skills_data)
산출:
+---+------+-----+-----------+| 아이디|사람|상품| 스킬|
+---+------+-----+-----------+
| 123 | 꿀| 25000 | 회화|
| 112 | 무니| 2000년 |음악/춤|
| 153 |툴라시| 1200 | 독서|
| 173 | 란| 2000년 | 음악|
| 43 |카말라| 10000 | 독서|
+---+------+-----+-----------+
[ '{'id':123,'person':'꿀','prize':25000,'skill':'그림 그리기'}' , '{'id':112,'person':'무니','prize':2000,'skill':'music/dance'}' , '{'id':153,'person':'툴라시','prize':1200,'skill':'reading'}' , '{'id':173,'person':'달리기','prize':2000,'skill':'music'}' , '{'id':43,'person':'카말라','prize':10000,'skill':'reading'}' ]
PySpark DataFrame에는 5개의 행이 있습니다. 이 5개 행은 모두 쉼표로 구분된 문자열 사전으로 반환됩니다.
Write.json()을 사용하여 PySpark DataFrame에서 JSON으로
write.json() 메서드는 PySpark DataFrame을 JSON 파일에 쓰고 저장하는 PySpark에서 사용할 수 있습니다. 파일 이름/경로를 매개변수로 사용합니다. 기본적으로 JSON을 여러 파일(분할된 파일)로 반환합니다. 이들 모두를 하나의 파일로 병합하려면 coalesce() 메서드를 사용할 수 있습니다.
통사론:
dataframe_object.coalesce( 1 ).write.json('파일 이름')- 추가 모드 – dataframe_object.write.mode('추가').json('파일 이름')
- 덮어쓰기 모드 – dataframe_object.write.mode('덮어쓰기').json('file_name')
기존 JSON을 추가/덮어쓸 수 있습니다. write.mode()를 사용하여 'append'를 전달하여 데이터를 추가하거나 이 함수에 'overwrite'를 전달하여 기존 JSON 데이터를 덮어쓸 수 있습니다.
예 1:
3개의 행과 4개의 열이 있는 'skills_df' PySpark DataFrame을 만듭니다. 이 DataFrame을 JSON에 씁니다.
파이스파크 가져오기수입 판다
pyspark.sql에서 SparkSession 가져오기
linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()
3행 4열의 스킬 데이터 #개
스킬 =[{ 'ID' : 123 , '사람' : '꿀' , '기술' : '그림' , '상' : 25000 },
{ 'ID' : 112 , '사람' : '모니' , '기술' : '춤' , '상' : 2000년 },
{ 'ID' : 153 , '사람' : '툴라시' , '기술' : '독서' , '상' : 1200 }
]
# 위의 데이터에서 기술 데이터 프레임을 만듭니다.
Skills_df = linuxhint_spark_app.createDataFrame(기술)
# 쓰기.json()
Skills_df.합체( 1 ).write.json( 'skills_data' )
JSON 파일:
Skills_data 폴더에 분할된 JSON 데이터가 포함되어 있음을 알 수 있습니다.
JSON 파일을 열어봅시다. PySpark DataFrame의 모든 행이 JSON으로 변환되는 것을 볼 수 있습니다.
PySpark DataFrame에는 5개의 행이 있습니다. 이 5개 행은 모두 쉼표로 구분된 문자열 사전으로 반환됩니다.
예 2:
행이 하나 있는 'skills2_df' PySpark DataFrame을 만듭니다. 모드를 '추가'로 지정하여 이전 JSON 파일에 행 하나를 추가합니다.
파이스파크 가져오기수입 판다
pyspark.sql에서 SparkSession 가져오기
linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()
스킬2 =[{ 'ID' : 78 , '사람' : '메리' , '기술' : '승마' , '상' : 8960 }
]
# 위의 데이터에서 기술 데이터 프레임을 만듭니다.
Skills2_df = linuxhint_spark_app.createDataFrame(스킬2)
# 추가 모드로 write.json().
Skills2_df.write.mode( '추가' ).json( 'skills_data' )
JSON 파일:
분할된 JSON 파일을 볼 수 있습니다. 첫 번째 파일은 첫 번째 DataFrame 레코드를 보유하고 두 번째 파일은 두 번째 DataFrame 레코드를 보유합니다.
결론
PySpark DataFrame을 JSON으로 변환하는 세 가지 방법이 있습니다. 먼저 PySpark DataFrame을 Pandas DataFrame으로 변환하여 JSON으로 변환하는 to_json() 메서드에 대해 서로 다른 파라미터를 고려하여 다양한 예제를 통해 살펴보았다. 다음으로 toJSON() 메서드를 활용했습니다. 마지막으로 write.json() 함수를 사용하여 PySpark DataFrame을 JSON에 쓰는 방법을 배웠습니다. 이 기능으로 추가 및 덮어쓰기가 가능합니다.