파이스파크 읽기.파켓()

Paiseupakeu Ilg Gi Pakes



PySpark에서 write.parquet() 함수는 DataFrame을 Parquet 파일에 쓰고 read.parquet()는 Parquet 파일을 PySpark DataFrame 또는 다른 DataSource로 읽습니다. Apache Spark에서 열을 빠르고 효율적으로 처리하려면 데이터를 압축해야 합니다. 데이터 압축은 메모리를 절약하고 모든 열은 플랫 레벨로 변환됩니다. 이는 플랫 컬럼 레벨 스토리지가 존재함을 의미합니다. 이를 저장하는 파일을 PARQUET 파일이라고 합니다.

이 가이드에서는 주로 pyspark.sql.DataFrameReader 클래스에서 사용할 수 있는 read.parquet() 함수를 사용하여 Parquet 파일을 PySpark DataFrame/SQL로 읽기/로드하는 데 중점을 둘 것입니다.

내용 주제:







Parquet 파일 가져오기



Parquet 파일을 PySpark DataFrame으로 읽기



Parquet 파일을 PySpark SQL로 읽기





Pyspark.sql.DataFrameReader.parquet()

이 함수는 parquet 파일을 읽고 PySpark DataFrame에 로드하는 데 사용됩니다. parquet 파일의 경로/파일 이름을 사용합니다. 일반 함수이므로 read.parquet() 함수를 간단히 사용할 수 있습니다.

통사론:



read.parquet()의 구문을 살펴보겠습니다.

spark_app.read.parquet(file_name.parquet/경로)

먼저 pip 명령을 사용하여 PySpark 모듈을 설치합니다.

핍 설치 pyspark

Parquet 파일 가져오기

Parquet 파일을 읽으려면 해당 데이터에서 Parquet 파일이 생성된 데이터가 필요합니다. 이 파트에서는 ​​PySpark DataFrame에서 parquet 파일을 생성하는 방법을 살펴봅니다.

5개의 레코드가 있는 PySpark DataFrame을 생성하고 이를 'industry_parquet' parquet 파일에 작성해 보겠습니다.

파이스파크 가져오기

pyspark.sql에서 가져오기 SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()

# 산업 세부 정보를 저장하는 데이터 프레임을 만듭니다.

industry_df = linuxhint_spark_app.createDataFrame([행(유형= '농업' ,면적= '미국' ,
평가= '더운' ,Total_employees= 100 ),

행(유형= '농업' ,면적= '인도' ,평점= '더운' ,Total_employees= 200 ),

행(유형= '개발' ,면적= '미국' ,평점= '따뜻한' ,Total_employees= 100 ),

행(유형= '교육' ,면적= '미국' ,평점= '시원한' ,Total_employees= 400 ),

행(유형= '교육' ,면적= '미국' ,평점= '따뜻한' ,Total_employees= 이십 )

])

# 실제 데이터프레임

industry_df.show()

# parquet 파일에 industry_df 쓰기

industry_df.coalesce( 1 ).write.parquet( 'industry_parquet' )

산출:

이것은 5개의 레코드를 보유하는 DataFrame입니다.

이전 DataFrame에 대해 쪽모이 세공 파일이 생성됩니다. 여기서 확장자가 있는 파일 이름은 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet'입니다. 전체 자습서에서 이 파일을 사용합니다.

Parquet 파일을 PySpark DataFrame으로 읽기

쪽모이 세공 파일이 있습니다. read.parquet() 함수를 사용하여 이 파일을 읽고 PySpark DataFrame에 로드해 보겠습니다.

파이스파크 가져오기

pyspark.sql에서 가져오기 SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()

# parquet 파일을 dataframe_from_parquet 개체로 읽습니다.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( '부품-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# dataframe_from_parquet-DataFrame 표시

dataframe_from_parquet.show()

산출:

parquet 파일에서 생성된 show() 메서드를 사용하여 DataFrame을 표시합니다.

Parquet 파일을 사용한 SQL 쿼리

DataFrame에 로드한 후 SQL 테이블을 생성하고 DataFrame에 있는 데이터를 표시할 수 있습니다. TEMPORARY VIEW를 생성하고 SQL 명령을 사용하여 parquet 파일에서 생성된 DataFrame에서 레코드를 반환해야 합니다.

예 1:

'Sectors'라는 임시 보기를 만들고 SELECT 명령을 사용하여 DataFrame에 레코드를 표시합니다. 당신은 이것을 참조할 수 있습니다 지도 시간 Spark – SQL에서 VIEW를 만드는 방법을 설명합니다.

파이스파크 가져오기

pyspark.sql에서 가져오기 SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()

# parquet 파일을 dataframe_from_parquet 개체로 읽습니다.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( '부품-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# 'Sectors'라는 위의 쪽모이 세공 파일에서 보기 만들기

dataframe_from_parquet.createOrReplaceTempView( '섹터' )

# 섹터의 모든 레코드를 표시하는 쿼리

linuxhint_spark_app.sql( '섹터에서 * 선택' ).보여주다()

산출:

예 2:

이전 VIEW를 사용하여 SQL 쿼리를 작성합니다.

  1. '인도'에 속하는 섹터의 모든 레코드를 표시합니다.
  2. 직원이 100명 이상인 섹터의 모든 레코드를 표시합니다.
# '인도'에 속하는 섹터의 모든 레코드를 표시하도록 쿼리합니다.

linuxhint_spark_app.sql( 'Area='인도'인 섹터에서 * 선택' ).보여주다()

# 직원이 100명 이상인 섹터의 모든 레코드를 표시하는 쿼리

linuxhint_spark_app.sql( 'Total_employees>100인 섹터에서 * 선택' ).보여주다()

산출:

지역이 '인도'인 레코드는 하나뿐이고 직원 수가 100보다 큰 레코드는 두 개입니다.

Parquet 파일을 PySpark SQL로 읽기

먼저 CREATE 명령을 사용하여 VIEW를 생성해야 합니다. SQL 쿼리 내에서 '경로' 키워드를 사용하여 Parquet 파일을 Spark SQL로 읽을 수 있습니다. 경로 뒤에는 파일 이름/파일 위치를 지정해야 합니다.

통사론:

spark_app.sql( 'CREATE TEMPORARY VIEW view_name USING parquet OPTIONS(경로 ' file_name.parquet ')' )

예 1:

'Sector2'라는 임시 보기를 만들고 Parquet 파일을 읽습니다. sql() 함수를 사용하여 뷰에 있는 모든 레코드를 표시하는 선택 쿼리를 작성합니다.

파이스파크 가져오기

pyspark.sql에서 가져오기 SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()

# Parquet 파일을 Spark-SQL로 읽기

linuxhint_spark_app.sql( '마루 옵션을 사용하여 임시 뷰 섹터2 생성(경로 ' 부품-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )

# Sector2의 모든 레코드를 표시하는 쿼리

linuxhint_spark_app.sql( '섹터2에서 * 선택' ).보여주다()

산출:

예 2:

이전 VIEW를 사용하고 조회를 작성하여 등급이 'Hot' 또는 'Cool'인 모든 레코드를 표시하십시오.

# Rating-Hot 또는 Cool을 사용하여 Sector2의 모든 레코드를 표시하도록 쿼리합니다.

linuxhint_spark_app.sql( '등급='뜨거움' OR 등급='멋짐'인 Sector2에서 *를 선택하십시오.' ).보여주다()

산출:

'Hot' 또는 'Cool' 등급의 레코드가 세 개 있습니다.

결론

PySpark에서 write.parquet() 함수는 DataFrame을 parquet 파일에 씁니다. read.parquet() 함수는 Parquet 파일을 PySpark DataFrame 또는 다른 DataSource로 읽습니다. Parquet 파일을 PySpark DataFrame과 PySpark 테이블로 읽는 방법을 배웠습니다. 이 자습서의 일부로 PySpark DataFrame에서 테이블을 만들고 WHERE 절을 사용하여 데이터를 필터링하는 방법도 설명했습니다.