파이스파크 판다스_Udf()

Paiseupakeu Pandaseu Udf



pandas_udf() 함수를 사용하여 PySpark DataFrame을 변환할 수 있습니다. 화살표로 PySpark DataFrame에 적용되는 사용자 정의 함수입니다. pandas_udf()를 사용하여 벡터화된 작업을 수행할 수 있습니다. 이 함수를 데코레이터로 전달하여 구현할 수 있습니다. 구문, 매개 변수 및 다양한 예를 알아보기 위해 이 가이드를 자세히 살펴보겠습니다.

내용 주제:

PySpark DataFrame 및 모듈 설치에 대해 알고 싶다면 다음을 참조하십시오. 기사 .







Pyspark.sql.functions.pandas_udf()

pandas_udf()는 'from' 키워드를 사용하여 가져올 수 있는 PySpark의 sql.functions 모듈에서 사용할 수 있습니다. PySpark DataFrame에서 벡터화된 작업을 수행하는 데 사용됩니다. 이 함수는 세 개의 매개변수를 전달하여 데코레이터처럼 구현됩니다. 그런 다음 화살표를 사용하여 데이터를 벡터 형식으로 반환하는 사용자 정의 함수(series/NumPy를 사용하는 것처럼)를 만들 수 있습니다. 이 함수 내에서 결과를 반환할 수 있습니다.



구조 및 구문:



먼저 이 함수의 구조와 구문을 살펴보겠습니다.

@pandas_udf(데이터 유형)
def function_name(작업) -> convert_format:
반환 문

여기서 function_name은 정의된 함수의 이름입니다. 데이터 유형은 이 함수가 반환하는 데이터 유형을 지정합니다. 'return' 키워드를 사용하여 결과를 반환할 수 있습니다. 모든 작업은 화살표 할당 기능 내에서 수행됩니다.





Pandas_udf(함수 및 반환 유형)

  1. 첫 번째 매개변수는 전달되는 사용자 정의 함수입니다.
  2. 두 번째 매개변수는 함수에서 반환 데이터 유형을 지정하는 데 사용됩니다.

데이터:

이 전체 가이드에서는 데모를 위해 하나의 PySpark DataFrame만 사용합니다. 우리가 정의하는 모든 사용자 정의 함수는 이 PySpark DataFrame에 적용됩니다. PySpark를 설치한 후 먼저 환경에서 이 DataFrame을 생성해야 합니다.



파이스파크 가져오기

pyspark.sql에서 SparkSession 가져오기

linuxhint_spark_app = SparkSession.builder.appName( '리눅스 힌트' ).getOrCreate()

pyspark.sql.functions에서 pandas_udf 가져오기

pyspark.sql.types 가져오기에서 *

판다를 판다로 가져오기

# 야채 정보

야채 =[{ '유형' : '채소' , '이름' : '토마토' , '위치_국가' : '미국' , '수량' : 800 },

{ '유형' : '과일' , '이름' : '바나나' , '위치_국가' : '중국' , '수량' : 이십 },

{ '유형' : '채소' , '이름' : '토마토' , '위치_국가' : '미국' , '수량' : 800 },

{ '유형' : '채소' , '이름' : '망고' , '위치_국가' : '일본' , '수량' : 0 },

{ '유형' : '과일' , '이름' : '레몬' , '위치_국가' : '인도' , '수량' : 1700년 },

{ '유형' : '채소' , '이름' : '토마토' , '위치_국가' : '미국' , '수량' : 1200 },

{ '유형' : '채소' , '이름' : '망고' , '위치_국가' : '일본' , '수량' : 0 },

{ '유형' : '과일' , '이름' : '레몬' , '위치_국가' : '인도' , '수량' : 0 }

]

# 위의 데이터에서 시장 데이터 프레임을 만듭니다.

market_df = linuxhint_spark_app.createDataFrame(야채)

market_df.show()

산출:

여기서는 4개의 열과 8개의 행으로 이 DataFrame을 만듭니다. 이제 pandas_udf()를 사용하여 사용자 정의 함수를 만들고 이러한 열에 적용합니다.

데이터 유형이 다른 Pandas_udf()

이 시나리오에서는 pandas_udf()를 사용하여 일부 사용자 정의 함수를 만들고 열에 적용하고 select() 메서드를 사용하여 결과를 표시합니다. 각각의 경우에 벡터화된 작업을 수행할 때 pandas.Series를 사용합니다. 이는 열 값을 1차원 배열로 간주하고 해당 열에 연산을 적용합니다. 데코레이터 자체에서 함수 반환 유형을 지정합니다.

예 1: 문자열 유형이 있는 Pandas_udf()

여기에서는 문자열 반환 유형을 사용하여 문자열 유형 열 값을 대문자와 소문자로 변환하는 두 개의 사용자 정의 함수를 만듭니다. 마지막으로 'type' 및 'locate_country' 열에 이러한 함수를 적용합니다.

# pandas_udf를 사용하여 유형 열을 대문자로 변환

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

i.str.upper() 반환

# pandas_udf를 사용하여 locate_country 열을 소문자로 변환

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

i.str.lower() 반환

# select()를 사용하여 열을 표시합니다.

market_df.select( '유형' ,type_upper_case( '유형' ), '위치_국가' ,
country_lower_case( '위치_국가' )).보여주다()

산출:

설명:

StringType() 함수는 pyspark.sql.types 모듈에서 사용할 수 있습니다. PySpark DataFrame을 생성하는 동안 이미 이 모듈을 가져왔습니다.

  1. 먼저 UDF(사용자 정의 함수)는 str.upper() 함수를 사용하여 문자열을 대문자로 반환합니다. str.upper()는 주어진 문자열을 대문자로 변환하는 시리즈 데이터 구조(함수 내부에 화살표가 있는 시리즈로 변환할 때)에서 사용할 수 있습니다. 마지막으로 이 함수는 select() 메서드 내부에 지정된 'type' 열에 적용됩니다. 이전에는 유형 열의 모든 문자열이 소문자였습니다. 이제 대문자로 변경되었습니다.
  2. 둘째, UDF는 str.lower() 함수를 사용하여 문자열을 대문자로 반환합니다. str.lower()는 주어진 문자열을 소문자로 변환하는 시리즈 데이터 구조에서 사용할 수 있습니다. 마지막으로 이 함수는 select() 메서드 내부에 지정된 'type' 열에 적용됩니다. 이전에는 유형 열의 모든 문자열이 대문자였습니다. 이제 소문자로 변경됩니다.

예제 2: 정수 유형의 Pandas_udf()

PySpark DataFrame 정수 열을 Pandas 시리즈로 변환하고 각 값에 100을 더하는 UDF를 생성해 보겠습니다. select() 메서드 내부의 이 함수에 '수량' 열을 전달합니다.

# 100 더하기

@pandas_udf(정수형())

def add_100(i: panda.Series) -> panda.Series:

i+ 반환 100

# 위의 함수에 수량 열을 전달하고 표시합니다.

market_df.select( '수량' ,추가_100( '수량' )).보여주다()

산출:

설명:

UDF 내에서 모든 값을 반복하고 시리즈로 변환합니다. 그런 다음 시리즈의 각 값에 100을 더합니다. 마지막으로 '수량' 열을 이 함수에 전달하면 모든 값에 100이 추가되는 것을 볼 수 있습니다.

Groupby() 및 Agg()를 사용하여 데이터 유형이 다른 Pandas_udf()

집계 열에 UDF를 전달하는 예를 살펴보겠습니다. 여기서는 groupby() 함수를 사용하여 열 값을 먼저 그룹화하고 agg() 함수를 사용하여 집계를 수행합니다. 이 집계 함수 내에서 UDF를 전달합니다.

통사론:

pyspark_dataframe_object.groupby( '그룹화_열' ).agg(UDF
(pyspark_dataframe_object[ '열' ]))

여기에서 그룹화 열의 값이 먼저 그룹화됩니다. 그런 다음 UDF와 관련하여 그룹화된 각 데이터에 대해 집계가 수행됩니다.

예제 1: Pandas_udf()와 Aggregate Mean()

여기에서는 반환 유형이 float인 사용자 정의 함수를 만듭니다. 함수 내에서 mean() 함수를 사용하여 평균을 계산합니다. 이 UDF는 각 유형의 평균 수량을 가져오기 위해 '수량' 열로 전달됩니다.

# 평균을 반환

@pandas_udf( '뜨다' )

def average_function(i: panda.Series) -> float:

반환 i.mean()

# 유형 열을 그룹화하여 수량 열을 함수에 전달합니다.

market_df.groupby( '유형' ).agg(average_function(market_df[ '수량' ])).보여주다()

산출:

'유형' 열의 요소를 기반으로 그룹화하고 있습니다. '과일'과 '채소'의 두 그룹이 형성됩니다. 각 그룹에 대해 평균이 계산되어 반환됩니다.

예 2: 집계 Max() 및 Min()이 포함된 Pandas_udf()

여기에서는 정수(int) 반환 유형을 사용하여 두 개의 사용자 정의 함수를 만듭니다. 첫 번째 UDF는 최소값을 반환하고 두 번째 UDF는 최대값을 반환합니다.

# 최소값을 반환하는 pandas_udf

@pandas_udf( '정수' )

def min_(i: panda.Series) -> int:

i.min() 반환

# 최대값을 반환하는 pandas_udf

@pandas_udf( '정수' )

def max_(i: panda.Series) -> int:

i.max() 반환

# locate_country를 그룹화하여 min_pandas_udf에 수량 열을 전달합니다.

market_df.groupby( '위치_국가' ).agg(min_(시장_df[ '수량' ])).보여주다()

# locate_country를 그룹화하여 max_pandas_udf에 수량 열을 전달합니다.

market_df.groupby( '위치_국가' ).agg(max_(시장_df[ '수량' ])).보여주다()

산출:

최소값과 최대값을 반환하기 위해 UDF의 반환 유형에서 min() 및 max() 함수를 활용합니다. 이제 'locate_country' 열의 데이터를 그룹화합니다. 4개의 그룹이 형성됩니다('CHINA', 'INDIA', 'JAPAN', 'USA'). 각 그룹에 대해 최대 수량을 반환합니다. 마찬가지로 최소 수량을 반환합니다.

결론

기본적으로 pandas_udf()는 PySpark DataFrame에서 벡터화된 작업을 수행하는 데 사용됩니다. pandas_udf()를 생성하고 이를 PySpark DataFrame에 적용하는 방법을 살펴보았습니다. 더 나은 이해를 위해 모든 데이터 유형(문자열, 부동 소수점 및 정수)을 고려하여 다양한 예를 논의했습니다. agg() 함수를 통해 groupby()와 함께 pandas_udf()를 사용할 수 있습니다.