Pomôžte rozvoju stránky a zdieľajte článok s priateľmi!
Úvod do Spark Transformations
Transformácia je funkcia, ktorá vracia nový RDD úpravou existujúcich RDD. Vstupné RDD sa nemení, pretože RDD sú nemenné. Všetky transformácie vykonáva Spark lenivým spôsobom - výsledky nie sú vypočítané okamžite. Výpočet transformácií prebieha iba vtedy, keď sa na RDD vykoná určitá akcia.
Typy transformácií v Spark
Sú všeobecne kategorizované do dvoch typov:
- Narrow Transformation: Všetky údaje potrebné na výpočet záznamov v jednom oddiele sa nachádzajú v jednom oddiele nadradeného RDD. Vyskytuje sa v prípade nasledujúcich metód:
map(), plochá mapa(), filter(), sample(), union() atď.
- Široká transformácia: Všetky údaje potrebné na výpočet záznamov v jednom oddiele sa nachádzajú vo viac ako jednom oddiele v nadradených RDD. Vyskytuje sa v prípade nasledujúcich metód:
distinct(), groupByKey(), reductionByKey(), join() , repartition() atď.
Príklady iskrových transformácií
Tu rozoberáme príklady uvedené nižšie.
1. Úzke premeny
- map(): Táto funkcia berie funkciu ako parameter a aplikuje túto funkciu na každý prvok RDD.
Kód:
"val conf=new SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERROR)
"val rdd=sc.parallelize(Array(10,15,50,100))
"println(Základný RDD je:)
">"rdd.foreach(x=print(x+ ))
println()
>val rddNew=rdd.map(x=x+10)
"println(RDD po použití metódy MAP:)
""rddNew.foreach(x=>print(x+ ))
Výstup:
Vo vyššie uvedenej metóde MAP pridávame každý prvok o 10 a to sa odráža vo výstupe.
- FlatMap(): Je podobná mape, ale môže generovať viacero výstupných položiek zodpovedajúcich jednej vstupnej položke. Funkcia teda musí vrátiť sekvenciu namiesto jednej položky.
Kód:
"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
""val rdd=sc.parallelize(Array(1:2:3,4:5:6))
">"val rddNew=rdd.flatMap(x=x.split(:))
"rddNew.foreach(x=>print(x+ ))
Výstup:
Táto funkcia odovzdaná ako parameter rozdeľuje každý vstup na „:“ a vracia pole a metóda FlatMap pole vyrovnáva.
- filter(): Berie funkciu ako parameter a vracia všetky prvky RDD, pre ktoré funkcia vracia hodnotu true.
Kód:
"val conf=new SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERROR)
""val rdd=sc.parallelize(Array(com.whatsapp.prod,com.facebook.prod,com.instagram.prod,com.whatsapp.test))
""println(Základný RDD je:)
">"rdd.foreach(x=print(x+ ))
println()
>"val rddNew=rdd.filter (x=!x.contains(test))
"println(RDD po použití metódy MAP:)
""rddNew.foreach(x=>print(x+ ))
Výstup:
Vo vyššie uvedenom kóde používame reťazce, ktoré neobsahujú slovo „test“.
- sample(): Vráti zlomok údajov, s náhradou alebo bez nej, pomocou daného zdroja generátora náhodných čísel (toto je však voliteľné).
Kód:
"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
"val rdd=sc.parallelize(Array(1,2,3,4,5,6,7,10,11,12,15,20,50))
val rddNew=rdd.sample(false,.5)
"rddNew.foreach(x=>print(x+ ))
Výstup:
V kóde vyššie dostávame náhodné vzorky bez výmeny.
- union(): Vráti spojenie zdrojového RDD a RDD odovzdaného ako parameter.
Kód:
"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
"val rdd=sc.parallelize(Array(1,2,3,4,5))
val rdd2=sc.parallelize(Array(-1,-2,-3,-4,-5))
val rddUnion=rdd.union(rdd2)
"rddUnion.foreach(x=>print(x+ ))
Výstup:
Výsledný RDD rddUnion obsahuje všetky prvky z rdd a rdd2.
2. Široké premeny
- distinct(): Táto metóda vracia odlišné prvky RDD.
Kód:
"val conf=new SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERROR)
"val rdd=sc.parallelize(Array(1,1,3,4,5,5,5))
"println(Základný RDD je:)
">"rdd.foreach(x=print(x+ ))
println()
val rddNew=rdd.distinct()
"println(RDD po použití metódy MAP:)
""rddNew.foreach(x=>print(x+ ))
Výstup:
dostávame na výstupe odlišné prvky 4,1,3,5.
- groupByKey(): Táto funkcia je použiteľná pre párové RDD. Párový RDD je ten, ktorého každý prvok je n-tica, kde prvý prvok je kľúč a druhý prvok je hodnota. Táto funkcia zoskupuje všetky hodnoty zodpovedajúce kľúču.
Kód:
"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
""val rdd=sc.parallelize(Array((a,1),(b,2),(a,3),(b,10),(a,100)))
"val rddNew=rdd.groupByKey()
"rddNew.foreach(x=>print(x+ ))
Výstup:
Podľa očakávania sú všetky hodnoty pre kľúče „a“ a „b“ zoskupené.
- reduceByKey(): Táto operácia je použiteľná aj pre párové RDD. Agreguje hodnoty pre každý kľúč podľa dodanej metódy redukcie, ktorá musí byť typu (v,v)=v.
Kód:
"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
""val rdd=sc.parallelize(Array((a,1),(b,2),(a,3),(b,10),(a,100),(c,50)))
">val rddNew=rdd.reduceByKey((x,y)=x+y )
"rddNew.foreach(x=>print(x+ ))
Výstup:
Vo vyššie uvedenom prípade sčítavame všetky hodnoty kľúča.
- join(): Operácia spojenia je použiteľná pre párové RDD. Metóda spojenia kombinuje dve množiny údajov na základe kľúča.
Kód:
"val conf=new SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERROR)
""val rdd1=sc.parallelize(Array((key1,10),(key2,15),(key3,100)))
""val rdd2=sc.parallelize(Array((key2,11),(key2,20),(key1,75)))
"val rddJoined=rdd1.join(rdd2)
"println(RDD po pripojení:)
""rddJoined.foreach(x=>print(x+ ))
Výstup:
- repartition(): Preskupuje dáta v RDD náhodne do počtu oddielov odovzdaných ako parameter. Môže zväčšiť aj zmenšiť oddiely.
Kód:
"val conf=new SparkConf().setAppName(test).setMaster(local)"
"println(Oddiely po: +rddNew.getNumPartitions)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
"val rdd=sc.parallelize(Array(1,2,3,4,5,10,15,18,243,50),10)
"println(Oddiely pred: +rdd.getNumPartitions)
"val rddNew=rdd.repartition(15)
Výstup:
Vo vyššie uvedenom prípade zväčšujeme oblasti z 10 na 15.