공식문서
spark.apache.org/docs/2.1.0/api/java/overview-summary.html
스파크 날짜 관련 함수
medium.com/expedia-group-tech/deep-dive-into-apache-spark-datetime-functions-b66de737950a
참고할 자료 wikidocs.net/book/1686
다양한 연산 eyeballs.tistory.com/84
java.time.LocalDateTime 형식
LocalDate.now(); // 오늘
LocalDateTime.now(); // 지금
LocalDate.of(2015, 4, 17); // 2015년4월17일
LocalDateTime.of(2015, 4, 17, 23, 23, 50); // 2015년4월17일23시23분50초
Year.of(2015).atMonth(3).atDay(4).atTime(10, 30); // 2015년3월4일 10시30분00초
출처: https://jekalmin.tistory.com/entry/자바-18-날짜-정리 [jekalmin의 블로그]
컬럼 추출 및 제거
df.select("id", "name")
df.drop("id", "name")
가로병합 : 조인
df1.join(df2, df1["id"] == df2["c_id"])
//df1의 id를 기준으로 df2의 c_id가 동일하도록 df1과 df2가 합쳐진다.
df1.join(df2, "기준컬럼", "inner")
//컬럼이름동일
dfA.join(dfB, dfA("") === dfB("") and dfA("") === dfB(""),"left_outer")
//여러조건
inner, outer, left_outer, right_outer, full, 세미.. 등
세로병합 : union
union(DF) //중복 제거
union all //중복도 그대로
createOrReplaceTempView : spark.sql로 SQL을 사용해서 데이터를 관리 가능하게 해준다.
df.createOrReplaceTempView("product")
start_df = spark.sql("SELECT category, product, revenue \
FROM product \
ORDER BY category, revenue DESC")
파일읽기
spark.read.-()
.toDF()
문자열 형태변환
// ① 뒤집기
val reverse = "Scala".reverse
println(s"① $reverse")
// ② 첫글자를 대문자로
val cap = "scala".capitalize
println(s"② $cap")
// ③ 7번 반복
val multi = "Scala! " * 7
println(s"③ $multi")
// ④ 정수로 변환
val int = "123".toInt
println(s"④ $int")
.filter (===, &&, |||, col(""))
.filter(col("a")==="148" && col("b")==="148" && col("c")==="1" && col("d")==="30"
||| col("d")==="31" ||| col("d")==="32"
날짜
LocalDateTime, TimeStamp
컬럼 이름 변경
df.withColumnRenamed(“ ”, “ ”)
column.alias(" ")
컬럼 추가
withColumn(“ ”, 표현식)
에러 : value count is not a member of Unit
.toDF() 해주기
날짜 형태 바꾸기 (date_format)
.withColumn("", date_format(expr(" + INTERVAL 9 HOURS"), "yyyy-MM-dd"))
//yyyy-mm-dd 하면 mm에 왠 분이 들어감
//yyyy-MM-dd 해야 03처럼 이쁘게 나옴
날짜 filter
날짜 차이 구하기 (Datediff)
datediff("Start_Date","End_Date")
//근데 타입 안맞는다하면 to_date(col("컬럼")) 해줘야 코드가 돌아감
순서
.withColumn("rank", rank().over(Window.partitionBy().orderBy())) //1 1 3
.withColumn("dense_rank", dense_rank().over(Window.partitionBy().orderBy())) //1 1 2
.withColumn("row_number", row_number().over(Window.partitionBy().orderBy())) //1 2 3
현재시간추출
current_date() y-m-d
current_timestamp() y-m-d h:m:s
first
www.python2.net/questions-907614.htm
컬럼끼리 연산
ddd.withColumn("res", $"order"+$"diff")
행 값들 리스트로 모으기
collect_list(" ")
이전/이후 행 가져오기 wikidocs.net/14631
lag('s_date, 1).over(Window.partitionBy('key1, 'key2).orderBy('s_date))
lead()
컬럼 변수 넣을때
'column = "column" ? 코드는 잘 돌아감
null을 0으로
DF.na.fill("0", Seq("컬럼"))
string 넣을때 (lit) withColumn에?
.withColumn("first_logindate", to_date(lit("2021-03-22")))
//lit가 중요! (F.lit도 있음)
split
dㅇ
스파크메모리확장
spark.conf.set("spark.rpc.message.maxSize", 256)
Seq : 여러 컬럼을 한 묶음으로 넣어야할때
dfA.join(dfB, Seq("A","B","C"), "left_outer")
groupBy로 리스트 만들때 특정 컬럼 순서로 리스트 만들기
val w = Window.partitionBy("A","B").orderBy("S")
val a = df_conti.withColumn("column_list", collect_list("column").over(w))
val month4 = a..groupBy("A","B").agg(max("column_list"))
//A,B기준 groupby / S기준 정렬
오름차순, 내림차순
orderBy($"column_name".desc)
.cast("date") (형태변환인듯)
lit({date_str}).cast("date")
스칼라 데이터프레임 편하게 (selectexpr사용)
localdate 여러 출력
localdate 계산
.minusDays(2)
filter 안에 변수넣을때
.filter(col("a") >= valA and col("b") <= valB)
column형태 시간차 초로 구하기 www.javaer101.com/ko/article/26160774.html
.withColumn("term", col("end").cast("long") - col("start").cast("long") )
컬럼내 값 변경하고 싶을때
DF.withColumn("이미존재하는컬럼이름 그대로", 조건)
for문
for (a <- 0 to 100)
{
}
문자열안에 변수넣기
"${변수}"
리스트앞에추가, 뒤에추가
리스트::a //앞
리스트:+a //뒤
이름 다른 컬럼 join
aa.as("left").join(prodRows.as("right"),
$"left.gender_PK" === $"right.gender_PK" || $"right.gender_PK" === "UNISEX")
코드
- $"컬럼" ?????
- ==말고 ===
컬럼 존재 확인 boolean
Try(df("column")).isSuccess
컬럼 고정해서 파일 읽기
val schem = a.schema
spark.read.schema(schem).parquet("")
배열에 변수 포함 여부
var a = 1
var arr = Array(1, 2)
if (arr.contains(a) ){
print("A\n\n")
}
'Spark > Scala' 카테고리의 다른 글
[Scala] 조건문 == 실수 (0) | 2021.11.26 |
---|