Pomôžte rozvoju stránky a zdieľajte článok s priateľmi!

Úvod do Spark Transformations

Transformácia je funkcia, ktorá vracia nový RDD úpravou existujúcich RDD. Vstupné RDD sa nemení, pretože RDD sú nemenné. Všetky transformácie vykonáva Spark lenivým spôsobom - výsledky nie sú vypočítané okamžite. Výpočet transformácií prebieha iba vtedy, keď sa na RDD vykoná určitá akcia.

Typy transformácií v Spark

Sú všeobecne kategorizované do dvoch typov:

  • Narrow Transformation: Všetky údaje potrebné na výpočet záznamov v jednom oddiele sa nachádzajú v jednom oddiele nadradeného RDD. Vyskytuje sa v prípade nasledujúcich metód:

map(), plochá mapa(), filter(), sample(), union() atď.

  • Široká transformácia: Všetky údaje potrebné na výpočet záznamov v jednom oddiele sa nachádzajú vo viac ako jednom oddiele v nadradených RDD. Vyskytuje sa v prípade nasledujúcich metód:

distinct(), groupByKey(), reductionByKey(), join() , repartition() atď.

Príklady iskrových transformácií

Tu rozoberáme príklady uvedené nižšie.

1. Úzke premeny

  • map(): Táto funkcia berie funkciu ako parameter a aplikuje túto funkciu na každý prvok RDD.

Kód:

"val conf=new SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERROR)
"val rdd=sc.parallelize(Array(10,15,50,100))
"println(Základný RDD je:)
">"rdd.foreach(x=print(x+ ))
println()
>val rddNew=rdd.map(x=x+10)
"println(RDD po použití metódy MAP:)
""rddNew.foreach(x=>print(x+ ))

Výstup:

Vo vyššie uvedenej metóde MAP pridávame každý prvok o 10 a to sa odráža vo výstupe.

  • FlatMap(): Je podobná mape, ale môže generovať viacero výstupných položiek zodpovedajúcich jednej vstupnej položke. Funkcia teda musí vrátiť sekvenciu namiesto jednej položky.

Kód:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
""val rdd=sc.parallelize(Array(1:2:3,4:5:6))
">"val rddNew=rdd.flatMap(x=x.split(:))
"rddNew.foreach(x=>print(x+ ))

Výstup:

Táto funkcia odovzdaná ako parameter rozdeľuje každý vstup na „:“ a vracia pole a metóda FlatMap pole vyrovnáva.

  • filter(): Berie funkciu ako parameter a vracia všetky prvky RDD, pre ktoré funkcia vracia hodnotu true.

Kód:

"val conf=new SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERROR)
""val rdd=sc.parallelize(Array(com.whatsapp.prod,com.facebook.prod,com.instagram.prod,com.whatsapp.test))
""println(Základný RDD je:)
">"rdd.foreach(x=print(x+ ))
println()
>"val rddNew=rdd.filter (x=!x.contains(test))
"println(RDD po použití metódy MAP:)
""rddNew.foreach(x=>print(x+ ))

Výstup:

Vo vyššie uvedenom kóde používame reťazce, ktoré neobsahujú slovo „test“.

  • sample(): Vráti zlomok údajov, s náhradou alebo bez nej, pomocou daného zdroja generátora náhodných čísel (toto je však voliteľné).

Kód:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
"val rdd=sc.parallelize(Array(1,2,3,4,5,6,7,10,11,12,15,20,50))
val rddNew=rdd.sample(false,.5)
"rddNew.foreach(x=>print(x+ ))

Výstup:

V kóde vyššie dostávame náhodné vzorky bez výmeny.

  • union(): Vráti spojenie zdrojového RDD a RDD odovzdaného ako parameter.

Kód:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
"val rdd=sc.parallelize(Array(1,2,3,4,5))
val rdd2=sc.parallelize(Array(-1,-2,-3,-4,-5))
val rddUnion=rdd.union(rdd2)
"rddUnion.foreach(x=>print(x+ ))

Výstup:

Výsledný RDD rddUnion obsahuje všetky prvky z rdd a rdd2.

2. Široké premeny

  • distinct(): Táto metóda vracia odlišné prvky RDD.

Kód:

"val conf=new SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERROR)
"val rdd=sc.parallelize(Array(1,1,3,4,5,5,5))
"println(Základný RDD je:)
">"rdd.foreach(x=print(x+ ))
println()
val rddNew=rdd.distinct()
"println(RDD po použití metódy MAP:)
""rddNew.foreach(x=>print(x+ ))

Výstup:

dostávame na výstupe odlišné prvky 4,1,3,5.

  • groupByKey(): Táto funkcia je použiteľná pre párové RDD. Párový RDD je ten, ktorého každý prvok je n-tica, kde prvý prvok je kľúč a druhý prvok je hodnota. Táto funkcia zoskupuje všetky hodnoty zodpovedajúce kľúču.

Kód:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
""val rdd=sc.parallelize(Array((a,1),(b,2),(a,3),(b,10),(a,100)))
"val rddNew=rdd.groupByKey()
"rddNew.foreach(x=>print(x+ ))

Výstup:

Podľa očakávania sú všetky hodnoty pre kľúče „a“ a „b“ zoskupené.

  • reduceByKey(): Táto operácia je použiteľná aj pre párové RDD. Agreguje hodnoty pre každý kľúč podľa dodanej metódy redukcie, ktorá musí byť typu (v,v)=v.

Kód:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
""val rdd=sc.parallelize(Array((a,1),(b,2),(a,3),(b,10),(a,100),(c,50)))
">val rddNew=rdd.reduceByKey((x,y)=x+y )
"rddNew.foreach(x=>print(x+ ))

Výstup:

Vo vyššie uvedenom prípade sčítavame všetky hodnoty kľúča.

  • join(): Operácia spojenia je použiteľná pre párové RDD. Metóda spojenia kombinuje dve množiny údajov na základe kľúča.

Kód:

"val conf=new SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERROR)
""val rdd1=sc.parallelize(Array((key1,10),(key2,15),(key3,100)))
""val rdd2=sc.parallelize(Array((key2,11),(key2,20),(key1,75)))
"val rddJoined=rdd1.join(rdd2)
"println(RDD po pripojení:)
""rddJoined.foreach(x=>print(x+ ))

Výstup:

  • repartition(): Preskupuje dáta v RDD náhodne do počtu oddielov odovzdaných ako parameter. Môže zväčšiť aj zmenšiť oddiely.

Kód:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=new SparkContext(conf)
"sc.setLogLevel(WARN)
"val rdd=sc.parallelize(Array(1,2,3,4,5,10,15,18,243,50),10)
"println(Oddiely pred: +rdd.getNumPartitions)
"val rddNew=rdd.repartition(15)
"println(Oddiely po: +rddNew.getNumPartitions)"

Výstup:

Vo vyššie uvedenom prípade zväčšujeme oblasti z 10 na 15.

Pomôžte rozvoju stránky a zdieľajte článok s priateľmi!