본문 바로가기

Data/Data Analysis

(43)
[Snowflake] strtok 여러 delimiter를 사용하여 split 대체함수 pyspark에서는 기존에 split함수를 이용하여 다음과 같이 여러 delimiter가 있을때 작업하면된다. import pyspark.sql.functions as F temp = 'hi|you:what's"up' F.split(temp, '\||:|"') => ['hi', 'you', 'what's', 'up'] 하지만 snowflake에서는 위처럼 작동하지않는다. 별도로 strtok라는 함수를 사용하여 split해줘야한다. temp = 'hi|you:what's"up' strtok(temp, "\||:|"") => ['hi', 'you', 'what's', 'up'] 요 몇일간 써보면서 느낀거지만, snowflake는 여러면에서 복잡한것같다...
[Pyspark] 'Detected implicit cartesian product for LEFT OUTER join between logical plans' 에러 해결방법 길게 안쓰겠다 다음과 같은 명령어면 해결된다 spark.conf.set( "spark.sql.crossJoin.enabled" , "true")
[Pyspark] pyspark Pipeline을 이용한 Data Normalizatioon Pyspark에서 Min Max Normalization을 처리할때, Pipeline이 없을 경우 다음과 같이 처리를 해야함 1. 먼저 처리하고싶은 column을 vector화 진행 2. vector화 모델에 fit처리 3. vector화된 column을 다시 MinMaxScaler 모델에 대입 4. MinMaxScaler에 대하여 Fit처리 하지만 Pipeline이 있을경우 두번씩 쓰였던 fit, transform문이 한줄로 줄여진다. -> 코드는 깔끔한게 최고! from pyspark.ml.feature import MinMaxScaler from pyspark.ml.feature import VectorAssembler from pyspark.ml import Pipeline try: df = Da..
[Pyspark] 차원축소 pyspark.ml.feature의 PCA 사용 [참조] spark.apache.org/docs/1.5.1/ml-features.html#pca Feature Extraction, Transformation, and Selection - SparkML - Spark 1.5.1 Documentation ML - Features This section covers algorithms for working with features, roughly divided into these groups: Extraction: Extracting features from “raw” data Transformation: Scaling, converting, or modifying features Selection: Selecting a subset from a l spa..
[Pyspark] sparkSQL groupBy concat사용하기 하나의 row에서 문자열들을 모두 합치는건 concat함수를 사용하면 되긴하지만, 가끔 여러 row에서 문자열들을 합쳐야할때가 있다. 그럴때 다음과 같은 코드를 활용하면 된다 select time, Id, concat_ws('', collect_list(item)) as concat from welcome group by 1,2 collect_list와, concat_ws 함수를 활용하면 SQL의 GROUP_CONCAT함수 효과를 맛볼수(?)있다.
[Pyspark] 비어있는 dataframe만들기 pyspark로 작업하다보면 비어있는 dataframe이 필요할때 있는데 간단히 정리 schema = StructType([StructField("UserId", StringType(), True)]) spark.createDataFrame([], schema).createOrReplaceTempView('Info') 이런식으로 StructField('컬럼명', 컬럼 타입, True) 로 설정한다. 여러 column일 경우엔 StructType안의 list에 콤마 구분으로 StructField를 늘리면된다
[Pyspark] pyspark에서 percentile사용하기 pyspark에서는 별도로 percentile함수가 없어보인다. 구글에 찾아봐도 결국엔 함수를 만들어서 사용하는것 같긴하지만... 혹시 함수가 있다면 댓글 부탁드립니다! 기존에는 percentile함수를 createOrReplaceTempView함수를 사용해서 SQL 테이블을 만들어서 SQL구문으로 percentile함수를 호출했는데. 이방식을 바로 pyspark에서 적용해보았다. temp.select('date', 'Country', 'User')\ .groupBy('date').agg(F.expr('percentile(User, 0.9)').alias('percentile90'))\ .show(100,False) 이런방식으로 pyspark에서도 바로 percentile함수를 사용 할 수가 있다
[Pyspark] pyspark 파일 저장 coalesce함수가 시간이 오래걸릴때 pyspark에서 계산된 데이터프레임을 json파일로 떨굴때 다음과 같은 코드를 쓰는데 result.coalesce(1).write.mode('overwrite').json(s3_address) 충분한 계산환경에서도 유독 시간이 엄청 오래걸렸다... 거의 30분~40분정도. 그래서 원인을 찾아보니 팀장님께서 다음과같은 링크를 보내주셨고 https://stackoverflow.com/questions/55712934/converting-the-dataframe-to-json-takes-lot-of-time 결론적으로 coalesce부분을 repartition으로 바꿔주면 해결된다 result.repartition(1).write.mode('overwrite').json(s3_address)