내용 주제:
PySpark DataFrame 및 모듈 설치에 대해 알고 싶다면 다음을 참조하십시오. 기사 .
Pyspark.sql.functions.pandas_udf()
pandas_udf()는 'from' 키워드를 사용하여 가져올 수 있는 PySpark의 sql.functions 모듈에서 사용할 수 있습니다. PySpark DataFrame에서 벡터화된 작업을 수행하는 데 사용됩니다. 이 함수는 세 개의 매개변수를 전달하여 데코레이터처럼 구현됩니다. 그런 다음 화살표를 사용하여 데이터를 벡터 형식으로 반환하는 사용자 정의 함수(series/NumPy를 사용하는 것처럼)를 만들 수 있습니다. 이 함수 내에서 결과를 반환할 수 있습니다.
구조 및 구문:
먼저 이 함수의 구조와 구문을 살펴보겠습니다.
@pandas_udf(데이터 유형)def function_name(작업) -> convert_format:
반환 문
여기서 function_name은 정의된 함수의 이름입니다. 데이터 유형은 이 함수가 반환하는 데이터 유형을 지정합니다. 'return' 키워드를 사용하여 결과를 반환할 수 있습니다. 모든 작업은 화살표 할당 기능 내에서 수행됩니다.
Pandas_udf(함수 및 반환 유형)
- 첫 번째 매개변수는 전달되는 사용자 정의 함수입니다.
- 두 번째 매개변수는 함수에서 반환 데이터 유형을 지정하는 데 사용됩니다.
데이터:
이 전체 가이드에서는 데모를 위해 하나의 PySpark DataFrame만 사용합니다. 우리가 정의하는 모든 사용자 정의 함수는 이 PySpark DataFrame에 적용됩니다. PySpark를 설치한 후 먼저 환경에서 이 DataFrame을 생성해야 합니다.
파이스파크 가져오기
pyspark.sql에서 SparkSession 가져오기
linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()
pyspark.sql.functions에서 pandas_udf 가져오기
pyspark.sql.types 가져오기에서 *
판다를 판다로 가져오기
# 야채 정보
야채 =[{ '유형' : '채소' , '이름' : '토마토' , '위치_국가' : '미국' , '수량' : 800 },
{ '유형' : '과일' , '이름' : '바나나' , '위치_국가' : '중국' , '수량' : 이십 },
{ '유형' : '채소' , '이름' : '토마토' , '위치_국가' : '미국' , '수량' : 800 },
{ '유형' : '채소' , '이름' : '망고' , '위치_국가' : '일본' , '수량' : 0 },
{ '유형' : '과일' , '이름' : '레몬' , '위치_국가' : '인도' , '수량' : 1700년 },
{ '유형' : '채소' , '이름' : '토마토' , '위치_국가' : '미국' , '수량' : 1200 },
{ '유형' : '채소' , '이름' : '망고' , '위치_국가' : '일본' , '수량' : 0 },
{ '유형' : '과일' , '이름' : '레몬' , '위치_국가' : '인도' , '수량' : 0 }
]
# 위의 데이터에서 시장 데이터 프레임을 만듭니다.
market_df = linuxhint_spark_app.createDataFrame(야채)
market_df.show()
산출:
여기서는 4개의 열과 8개의 행으로 이 DataFrame을 만듭니다. 이제 pandas_udf()를 사용하여 사용자 정의 함수를 만들고 이러한 열에 적용합니다.
데이터 유형이 다른 Pandas_udf()
이 시나리오에서는 pandas_udf()를 사용하여 일부 사용자 정의 함수를 만들고 열에 적용하고 select() 메서드를 사용하여 결과를 표시합니다. 각각의 경우에 벡터화된 작업을 수행할 때 pandas.Series를 사용합니다. 이는 열 값을 1차원 배열로 간주하고 해당 열에 연산을 적용합니다. 데코레이터 자체에서 함수 반환 유형을 지정합니다.
예 1: 문자열 유형이 있는 Pandas_udf()
여기에서는 문자열 반환 유형을 사용하여 문자열 유형 열 값을 대문자와 소문자로 변환하는 두 개의 사용자 정의 함수를 만듭니다. 마지막으로 'type' 및 'locate_country' 열에 이러한 함수를 적용합니다.
# pandas_udf를 사용하여 유형 열을 대문자로 변환@pandas_udf(StringType())
def type_upper_case(i: panda.Series) -> panda.Series:
i.str.upper() 반환
# pandas_udf를 사용하여 locate_country 열을 소문자로 변환
@pandas_udf(StringType())
def country_lower_case(i: panda.Series) -> panda.Series:
i.str.lower() 반환
# select()를 사용하여 열을 표시합니다.
market_df.select( '유형' ,type_upper_case( '유형' ), '위치_국가' ,
country_lower_case( '위치_국가' )).보여주다()
산출:
설명:
StringType() 함수는 pyspark.sql.types 모듈에서 사용할 수 있습니다. PySpark DataFrame을 생성하는 동안 이미 이 모듈을 가져왔습니다.
- 먼저 UDF(사용자 정의 함수)는 str.upper() 함수를 사용하여 문자열을 대문자로 반환합니다. str.upper()는 주어진 문자열을 대문자로 변환하는 시리즈 데이터 구조(함수 내부에 화살표가 있는 시리즈로 변환할 때)에서 사용할 수 있습니다. 마지막으로 이 함수는 select() 메서드 내부에 지정된 'type' 열에 적용됩니다. 이전에는 유형 열의 모든 문자열이 소문자였습니다. 이제 대문자로 변경되었습니다.
- 둘째, UDF는 str.lower() 함수를 사용하여 문자열을 대문자로 반환합니다. str.lower()는 주어진 문자열을 소문자로 변환하는 시리즈 데이터 구조에서 사용할 수 있습니다. 마지막으로 이 함수는 select() 메서드 내부에 지정된 'type' 열에 적용됩니다. 이전에는 유형 열의 모든 문자열이 대문자였습니다. 이제 소문자로 변경됩니다.
예제 2: 정수 유형의 Pandas_udf()
PySpark DataFrame 정수 열을 Pandas 시리즈로 변환하고 각 값에 100을 더하는 UDF를 생성해 보겠습니다. select() 메서드 내부의 이 함수에 '수량' 열을 전달합니다.
# 100 더하기@pandas_udf(정수형())
def add_100(i: panda.Series) -> panda.Series:
i+ 반환 100
# 위의 함수에 수량 열을 전달하고 표시합니다.
market_df.select( '수량' ,추가_100( '수량' )).보여주다()
산출:
설명:
UDF 내에서 모든 값을 반복하고 시리즈로 변환합니다. 그런 다음 시리즈의 각 값에 100을 더합니다. 마지막으로 '수량' 열을 이 함수에 전달하면 모든 값에 100이 추가되는 것을 볼 수 있습니다.
Groupby() 및 Agg()를 사용하여 데이터 유형이 다른 Pandas_udf()
집계 열에 UDF를 전달하는 예를 살펴보겠습니다. 여기서는 groupby() 함수를 사용하여 열 값을 먼저 그룹화하고 agg() 함수를 사용하여 집계를 수행합니다. 이 집계 함수 내에서 UDF를 전달합니다.
통사론:
pyspark_dataframe_object.groupby( '그룹화_열' ).agg(UDF(pyspark_dataframe_object[ '열' ]))
여기에서 그룹화 열의 값이 먼저 그룹화됩니다. 그런 다음 UDF와 관련하여 그룹화된 각 데이터에 대해 집계가 수행됩니다.
예제 1: Pandas_udf()와 Aggregate Mean()
여기에서는 반환 유형이 float인 사용자 정의 함수를 만듭니다. 함수 내에서 mean() 함수를 사용하여 평균을 계산합니다. 이 UDF는 각 유형의 평균 수량을 가져오기 위해 '수량' 열로 전달됩니다.
# 평균을 반환@pandas_udf( '뜨다' )
def average_function(i: panda.Series) -> float:
반환 i.mean()
# 유형 열을 그룹화하여 수량 열을 함수에 전달합니다.
market_df.groupby( '유형' ).agg(average_function(market_df[ '수량' ])).보여주다()
산출:
'유형' 열의 요소를 기반으로 그룹화하고 있습니다. '과일'과 '채소'의 두 그룹이 형성됩니다. 각 그룹에 대해 평균이 계산되어 반환됩니다.
예 2: 집계 Max() 및 Min()이 포함된 Pandas_udf()
여기에서는 정수(int) 반환 유형을 사용하여 두 개의 사용자 정의 함수를 만듭니다. 첫 번째 UDF는 최소값을 반환하고 두 번째 UDF는 최대값을 반환합니다.
# 최소값을 반환하는 pandas_udf@pandas_udf( '정수' )
def min_(i: panda.Series) -> int:
i.min() 반환
# 최대값을 반환하는 pandas_udf
@pandas_udf( '정수' )
def max_(i: panda.Series) -> int:
i.max() 반환
# locate_country를 그룹화하여 min_pandas_udf에 수량 열을 전달합니다.
market_df.groupby( '위치_국가' ).agg(min_(시장_df[ '수량' ])).보여주다()
# locate_country를 그룹화하여 max_pandas_udf에 수량 열을 전달합니다.
market_df.groupby( '위치_국가' ).agg(max_(시장_df[ '수량' ])).보여주다()
산출:
최소값과 최대값을 반환하기 위해 UDF의 반환 유형에서 min() 및 max() 함수를 활용합니다. 이제 'locate_country' 열의 데이터를 그룹화합니다. 4개의 그룹이 형성됩니다('CHINA', 'INDIA', 'JAPAN', 'USA'). 각 그룹에 대해 최대 수량을 반환합니다. 마찬가지로 최소 수량을 반환합니다.
결론
기본적으로 pandas_udf()는 PySpark DataFrame에서 벡터화된 작업을 수행하는 데 사용됩니다. pandas_udf()를 생성하고 이를 PySpark DataFrame에 적용하는 방법을 살펴보았습니다. 더 나은 이해를 위해 모든 데이터 유형(문자열, 부동 소수점 및 정수)을 고려하여 다양한 예를 논의했습니다. agg() 함수를 통해 groupby()와 함께 pandas_udf()를 사용할 수 있습니다.