PySpark DataFrame을 JSON으로 변환

Pyspark Dataframeeul Jsoneulo Byeonhwan



JSON을 사용하여 구조화된 데이터를 전송하는 것이 가능하며 메모리도 적게 소모합니다. PySpark RDD 또는 PySpark DataFrame과 비교할 때 JSON은 JSON으로 가능한 낮은 메모리 및 직렬화를 사용합니다. pyspark.sql.DataFrameWriter.json() 메서드를 사용하여 PySpark DataFrame을 JSON으로 변환할 수 있습니다. 그 외에도 DataFrame을 JSON으로 변환하는 두 가지 다른 방법이 있습니다.

내용 주제:

모든 예제에서 간단한 PySpark DataFrame을 고려하고 언급된 함수를 사용하여 JSON으로 변환해 봅시다.







필요한 모듈:

아직 설치되지 않은 경우 환경에 PySpark 라이브러리를 설치합니다. 다음 명령을 참조하여 설치할 수 있습니다.



핍 설치 pyspark

ToPandas()와 함께 To_json()을 사용하여 PySpark DataFrame에서 JSON으로

to_json() 메서드는 Pandas DataFrame을 JSON으로 변환하는 Pandas 모듈에서 사용할 수 있습니다. PySpark DataFrame을 Pandas DataFrame으로 변환하면 이 방법을 활용할 수 있습니다. PySpark DataFrame을 Pandas DataFrame으로 변환하기 위해 toPandas() 메서드를 사용합니다. 매개변수와 함께 to_json() 구문을 살펴보겠습니다.



통사론:





dataframe_object.toPandas().to_json(orient,index,...)
  1. Orient는 변환된 JSON을 원하는 형식으로 표시하는 데 사용됩니다. '레코드', '테이블', '값', '열', '인덱스', '분할'이 필요합니다.
  2. 인덱스는 변환된 JSON 문자열에서 인덱스를 포함/제거하는 데 사용됩니다. 'True'로 설정하면 인덱스가 표시됩니다. 그렇지 않으면 방향이 '분할' 또는 '테이블'인 경우 인덱스가 표시되지 않습니다.

예 1: '레코드'로 방향 지정

3개의 행과 4개의 열이 있는 'skills_df' PySpark DataFrame을 만듭니다. orient 매개변수를 'records'로 지정하여 이 DataFrame을 JSON으로 변환합니다.

파이스파크 가져오기

수입 판다

pyspark.sql에서 SparkSession 가져오기

linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()

3행 4열의 스킬 데이터 #개

스킬 =[{ 'ID' : 123 , '사람' : '꿀' , '기술' : '그림' , '상' : 25000 },

{ 'ID' : 112 , '사람' : '모니' , '기술' : '춤' , '상' : 2000년 },

{ 'ID' : 153 , '사람' : '툴라시' , '기술' : '독서' , '상' : 1200 }

]

# 위의 데이터에서 기술 데이터 프레임을 만듭니다.

Skills_df = linuxhint_spark_app.createDataFrame(기술)

# 실제 스킬 데이터

Skills_df.show()

# orient를 '레코드'로 사용하여 to_json()을 사용하여 JSON으로 변환

json_skills_data = Skills_df.toPandas().to_json(orient= '기록' )

인쇄(json_skills_data)

산출:



+---+------+-----+--------+

| 아이디|사람|상품| 스킬|

+---+------+-----+--------+

| 123 | 꿀| 25000 |그림|

| 112 | 무니| 2000년 | 댄스|

| 153 |툴라시| 1200 | 독서|

+---+------+-----+--------+

[{ 'ID' : 123 , '사람' : '꿀' , '상' : 25000 , '기술' : '그림' },{ 'ID' : 112 , '사람' : '무니' , '상' : 2000년 , '기술' : '춤' },{ 'ID' : 153 , '사람' : '툴라시' , '상' : 1200 , '기술' : '독서' }]

PySpark DataFrame이 값 사전이 있는 JSON 배열로 변환되는 것을 볼 수 있습니다. 여기서 키는 열 이름을 나타내고 값은 PySpark DataFrame의 행/셀 값을 나타냅니다.

예 2: '분할'로 방향 지정

'분할' 방향에 의해 반환되는 JSON 형식에는 열 목록, 인덱스 목록 및 데이터 목록이 있는 열 이름이 포함됩니다. 다음은 '분할' 오리엔트의 형식입니다.

# orient를 'split'으로 사용하여 to_json()을 사용하여 JSON으로 변환

json_skills_data = Skills_df.toPandas().to_json(orient= '나뉘다' )

인쇄(json_skills_data)

산출:

{ '열' :[ 'ID' , '사람' , '상' , '기술' ], '색인' :[ 0 , 1 , 2 ], '데이터' :[[ 123 , '꿀' , 25000 , '그림' ],[ 112 , '무니' , 2000년 , '춤' ],[ 153 , '툴라시' , 1200 , '독서' ]]}

예 3: '인덱스'로 방향 지정

여기에서 PySpark DataFrame의 각 행은 키가 열 이름인 사전 형식으로 폐기됩니다. 각 사전에 대해 인덱스 위치가 키로 지정됩니다.

# orient를 'index'로 지정하여 to_json()을 사용하여 JSON으로 변환

json_skills_data = Skills_df.toPandas().to_json(orient= '색인' )

인쇄(json_skills_data)

산출:

{ '0' :{ 'ID' : 123 , '사람' : '꿀' , '상' : 25000 , '기술' : '그림' }, '1' :{ 'ID' : 112 , '사람' : '무니' , '상' : 2000년 , '기술' : '춤' }, '2' :{ 'ID' : 153 , '사람' : '툴라시' , '상' : 1200 , '기술' : '독서' }}

예 4: '열'로 방향 지정

열은 각 레코드의 키입니다. 각 열에는 색인 번호가 있는 열 값을 가져오는 사전이 있습니다.

# orient as 'columns'와 함께 to_json()을 사용하여 JSON으로 변환

json_skills_data = Skills_df.toPandas().to_json(orient= '열' )

인쇄(json_skills_data)

산출:

{ 'ID' :{ '0' : 123 , '1' : 112 , '2' : 153 }, '사람' :{ '0' : '꿀' , '1' : '무니' , '2' : '툴라시' }, '상' :{ '0' : 25000 , '1' : 2000년 , '2' : 1200 }, '기술' :{ '0' : '그림' , '1' : '춤' , '2' : '독서' }}

예 5: '값'으로 방향 지정

JSON의 값만 필요한 경우 '값' 방향으로 이동할 수 있습니다. 목록의 각 행을 표시합니다. 마지막으로 모든 목록이 목록에 저장됩니다. 이 JSON은 중첩 목록 유형입니다.

# orient를 'values'로 사용하여 to_json()을 사용하여 JSON으로 변환

json_skills_data = Skills_df.toPandas().to_json(orient= '가치' )

인쇄(json_skills_data)

산출:

[[ 123 , '꿀' , 25000 , '그림' ],[ 112 , '무니' , 2000년 , '춤' ],[ 153 , '툴라시' , 1200 , '독서' ]]

예 6: '테이블'로 방향 지정

'테이블' 방향은 열 데이터 유형, 기본 키로서의 인덱스 및 Pandas 버전과 함께 필드 이름이 있는 스키마를 포함하는 JSON을 반환합니다. 값이 있는 열 이름은 '데이터'로 표시됩니다.

# orient as 'table'과 함께 to_json()을 사용하여 JSON으로 변환

json_skills_data = Skills_df.toPandas().to_json(orient= '테이블' )

인쇄(json_skills_data)

산출:

{ '개요' :{ '필드' :[{ '이름' : '색인' , '유형' : '정수' },{ '이름' : 'ID' , '유형' : '정수' },{ '이름' : '사람' , '유형' : '끈' },{ '이름' : '상' , '유형' : '정수' },{ '이름' : '기술' , '유형' : '끈' }], '기본 키' :[ '색인' ], 'pandas_version' : '1.4.0' }, '데이터' :[{ '색인' : 0 , 'ID' : 123 , '사람' : '꿀' , '상' : 25000 , '기술' : '그림' },{ '색인' : 1 , 'ID' : 112 , '사람' : '무니' , '상' : 2000년 , '기술' : '춤' },{ '색인' : 2 , 'ID' : 153 , '사람' : '툴라시' , '상' : 1200 , '기술' : '독서' }]}

예 7: 인덱스 매개변수 사용

먼저 인덱스 매개변수를 'True'로 설정하여 전달합니다. 인덱스 위치가 사전의 키로 반환되는 각 열 값에 대해 볼 수 있습니다.

두 번째 출력에서는 인덱스가 'False'로 설정되어 있으므로 인덱스 위치 없이 열 이름('columns')과 레코드('data')만 반환됩니다.

# index=True와 함께 to_json()을 사용하여 JSON으로 변환

json_skills_data = Skills_df.toPandas().to_json(색인=참)

인쇄(json_skills_data, ' \N ' )

# index=False로 to_json()을 사용하여 JSON으로 변환

json_skills_data= Skills_df.toPandas().to_json(인덱스=False,orient= '나뉘다' )

인쇄(json_skills_data)

산출:

{ 'ID' :{ '0' : 123 , '1' : 112 , '2' : 153 }, '사람' :{ '0' : '꿀' , '1' : '무니' , '2' : '툴라시' }, '상' :{ '0' : 25000 , '1' : 2000년 , '2' : 1200 }, '기술' :{ '0' : '그림' , '1' : '춤' , '2' : '독서' }}

{ '열' :[ 'ID' , '사람' , '상' , '기술' ], '데이터' :[[ 123 , '꿀' , 25000 , '그림' ],[ 112 , '무니' , 2000년 , '춤' ],[ 153 , '툴라시' , 1200 , '독서' ]]

ToJSON()을 사용하여 PySpark DataFrame에서 JSON으로

toJSON() 메서드는 PySpark DataFrame을 JSON 개체로 변환하는 데 사용됩니다. 기본적으로 목록으로 둘러싸인 JSON 문자열을 반환합니다. 그만큼 ['{열:값,...}',... ] 이 함수가 반환하는 형식입니다. 여기에서 PySpark DataFrame의 각 행은 열 이름을 키로 하는 사전으로 반환됩니다.

통사론:

dataframe_object.toJSON()

인덱스, 열 레이블 및 데이터 유형과 같은 매개변수를 전달할 수 있습니다.

예:

5개의 행과 4개의 열이 있는 'skills_df' PySpark DataFrame을 만듭니다. toJSON() 메서드를 사용하여 이 DataFrame을 JSON으로 변환합니다.

파이스파크 가져오기

pyspark.sql에서 SparkSession 가져오기

linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()

5행 4열의 기술 데이터 #개

스킬 =[{ 'ID' : 123 , '사람' : '꿀' , '기술' : '그림' , '상' : 25000 },

{ 'ID' : 112 , '사람' : '모니' , '기술' : '음악/춤' , '상' : 2000년 },

{ 'ID' : 153 , '사람' : '툴라시' , '기술' : '독서' , '상' : 1200 },

{ 'ID' : 173 , '사람' : '란' , '기술' : '음악' , '상' : 2000년 },

{ 'ID' : 43 , '사람' : '카말라' , '기술' : '독서' , '상' : 10000 }

]

# 위의 데이터에서 기술 데이터 프레임을 만듭니다.

Skills_df = linuxhint_spark_app.createDataFrame(기술)

# 실제 스킬 데이터

Skills_df.show()

# JSON 배열로 변환

json_skills_data = Skills_df.toJSON().collect()

인쇄(json_skills_data)

산출:

+---+------+-----+-----------+

| 아이디|사람|상품| 스킬|

+---+------+-----+-----------+

| 123 | 꿀| 25000 | 회화|

| 112 | 무니| 2000년 |음악/춤|

| 153 |툴라시| 1200 | 독서|

| 173 | 란| 2000년 | 음악|

| 43 |카말라| 10000 | 독서|

+---+------+-----+-----------+

[ '{'id':123,'person':'꿀','prize':25000,'skill':'그림 그리기'}' , '{'id':112,'person':'무니','prize':2000,'skill':'music/dance'}' , '{'id':153,'person':'툴라시','prize':1200,'skill':'reading'}' , '{'id':173,'person':'달리기','prize':2000,'skill':'music'}' , '{'id':43,'person':'카말라','prize':10000,'skill':'reading'}' ]

PySpark DataFrame에는 5개의 행이 있습니다. 이 5개 행은 모두 쉼표로 구분된 문자열 사전으로 반환됩니다.

Write.json()을 사용하여 PySpark DataFrame에서 JSON으로

write.json() 메서드는 PySpark DataFrame을 JSON 파일에 쓰고 저장하는 PySpark에서 사용할 수 있습니다. 파일 이름/경로를 매개변수로 사용합니다. 기본적으로 JSON을 여러 파일(분할된 파일)로 반환합니다. 이들 모두를 하나의 파일로 병합하려면 coalesce() 메서드를 사용할 수 있습니다.

통사론:

dataframe_object.coalesce( 1 ).write.json('파일 이름')
  1. 추가 모드 – dataframe_object.write.mode('추가').json('파일 이름')
  2. 덮어쓰기 모드 – dataframe_object.write.mode('덮어쓰기').json('file_name')

기존 JSON을 추가/덮어쓸 수 있습니다. write.mode()를 사용하여 'append'를 전달하여 데이터를 추가하거나 이 함수에 'overwrite'를 전달하여 기존 JSON 데이터를 덮어쓸 수 있습니다.

예 1:

3개의 행과 4개의 열이 있는 'skills_df' PySpark DataFrame을 만듭니다. 이 DataFrame을 JSON에 씁니다.

파이스파크 가져오기

수입 판다

pyspark.sql에서 SparkSession 가져오기

linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()

3행 4열의 스킬 데이터 #개

스킬 =[{ 'ID' : 123 , '사람' : '꿀' , '기술' : '그림' , '상' : 25000 },

{ 'ID' : 112 , '사람' : '모니' , '기술' : '춤' , '상' : 2000년 },

{ 'ID' : 153 , '사람' : '툴라시' , '기술' : '독서' , '상' : 1200 }

]

# 위의 데이터에서 기술 데이터 프레임을 만듭니다.

Skills_df = linuxhint_spark_app.createDataFrame(기술)

# 쓰기.json()

Skills_df.합체( 1 ).write.json( 'skills_data' )

JSON 파일:

Skills_data 폴더에 분할된 JSON 데이터가 포함되어 있음을 알 수 있습니다.

JSON 파일을 열어봅시다. PySpark DataFrame의 모든 행이 JSON으로 변환되는 것을 볼 수 있습니다.

PySpark DataFrame에는 5개의 행이 있습니다. 이 5개 행은 모두 쉼표로 구분된 문자열 사전으로 반환됩니다.

예 2:

행이 하나 있는 'skills2_df' PySpark DataFrame을 만듭니다. 모드를 '추가'로 지정하여 이전 JSON 파일에 행 하나를 추가합니다.

파이스파크 가져오기

수입 판다

pyspark.sql에서 SparkSession 가져오기

linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()

스킬2 =[{ 'ID' : 78 , '사람' : '메리' , '기술' : '승마' , '상' : 8960 }

]

# 위의 데이터에서 기술 데이터 프레임을 만듭니다.

Skills2_df = linuxhint_spark_app.createDataFrame(스킬2)

# 추가 모드로 write.json().

Skills2_df.write.mode( '추가' ).json( 'skills_data' )

JSON 파일:

분할된 JSON 파일을 볼 수 있습니다. 첫 번째 파일은 첫 번째 DataFrame 레코드를 보유하고 두 번째 파일은 두 번째 DataFrame 레코드를 보유합니다.

결론

PySpark DataFrame을 JSON으로 변환하는 세 가지 방법이 있습니다. 먼저 PySpark DataFrame을 Pandas DataFrame으로 변환하여 JSON으로 변환하는 to_json() 메서드에 대해 서로 다른 파라미터를 고려하여 다양한 예제를 통해 살펴보았다. 다음으로 toJSON() 메서드를 활용했습니다. 마지막으로 write.json() 함수를 사용하여 PySpark DataFrame을 JSON에 쓰는 방법을 배웠습니다. 이 기능으로 추가 및 덮어쓰기가 가능합니다.