2016. október 19., szerda

Go konkurencia kezelés gyorstalpaló

Mikor elkezdtem ezt a blogot írni, érdeklődésem előterében a Java alapú technológiák voltak. Rengeteg téma merült fel az eltelt 7 évben, amiről szívesen írtam volna, de mivel nem kapcsolódtak szorosan az eredeti tematikához, ezek a bejegyzések sosem kerültek implementálásra. Szakítva ezzel a (rossz) hagyománnyal szeretnék az informatika világának szélesebb spektrumával foglalkozni ezen túl. Első Java mentes bejegyzésem témájának pedig a Go nyelv konkurencia kezelését választottam.

A Go nyelv körül elég nagy a sürgés-forgás mostanában, nem tudom pontosan miért örvend ekkora népszerűségnek. Számtalan jó tulajdonsága van, de szerintem mind közül a legfontosabb szempont, hogy a nyelv egyszerű. Nincsenek mögötte "bonyolult" ideológiák mint például az objektum orientált programozásban, vagy nem lehet benne egy dolgot száz féleképpen csinálni mint mondjuk a funkcionális nyelvekben. Kutyaközönséges, de modern procedurális nyelv annak minden előnyével és hátrányával. Akit részletesebben érdekel ez a terület érdemes Rob Pike előadását meghallgatni. Témánkra rátérve ugyanez az egyszerűség jellemzi a Go konkurencia kezelését is. Vannak az un. goroutinok, ezek segítségével lehet párhozamosítani a végrehajtást, és vannak a chanelek, amiken keresztűl zajlik a kommunikáció. Utóbbiból van szinkron meg aszinkron.

A program létrehoz egy c szinkron chanelt, majd elindít egy goroutint, és kiolvassa a chanleből az értéket, amit a goroutin beletesz. Mivel szinkron módban működik egyik végrehajtás sem megy tovább amíg a kommunikáció meg nem történik. Aszinkron channelt a méretének megadásával tudunk létrehozni. Kész is, mindent tudunk :) Akit részletesebben érdekel, mi történik a motor-háztető alatt, annak ajánlom a Go memória kezeléséről szóló cikket. Természetesen ennyivel nem ússzuk meg a dolgot, komolyabb alkalmazásnál szükségünk van meg egy pár dologra, hogy biztosítani tudjuk programunk helyes működését. Két csomagot fogunk felhasználni, az egyik a sync, ami szinkronizációs problémákra nyújt megoldást, illetve a sync/atomic, melynek segítségével elemi műveleteket végezhetünk. A Go nyelv dokumentációját olvasva számtalan helyen olvashatjuk, hogy nincs garancia, hogy adott dolog hogyan viselkedik mikor több végrehajtó szál birizgálja egy időben (nem 1 az 1-hez a goroutine és szál leképzés, de ettől most tekintsünk el). Alapvetően érték szerint adódik át minden, így a legtöbb esetben nincs is szükség különösebb szinkronizációra, de ha mégis, magunknak kell gondoskodni róla.

Kitaláltam egy egyszerű kis feladatot, amin keresztűl bemutatok pár fontos eszközt működés közben.
  • Legyen egy szál típus, ami 1000 véletlen számot generál, és a legnagyobbat elküldi feldolgozásra.
  • Egy másik az előzőtől kapott értékből kiindulva generál még 500 véletlen számot, a legnagyobbat gyűjti egy tömbbe és tovább küldi.
  • A harmadik feldolgozó nem csinál mást, mint számolja, hogy a kapott érték hány esetben volt az utolsó a tömbben.
  • Szabályozható legyen a goroutinok száma, a generálóból hány példány futhat egyidőben, valamint hány értéket generáljon összesen.
  • Mérjük a végrehajtás idejét a generálás kezdetétől.
A feladatnak semmi értelme, és biztos egyszerűbben is meg lehetett volna oldani, de igyekeztem minél több módszert belezsúfítani a megoldásba.

Első lépésben inicializáljuk a konstansokat, valamint az eredmény tömböt. Mivel az eredménnyel több szál is fog egy időben dolgozni szükségünk lesz valami zárolási logikára. Tekintettel arra, hogy az egyik szál írja a másik pedig olvassa én a sync.RWMutex-et választottam.

A következő kódblockban implementálásra kerül a statisztikázó egység.

Majd megírjuk a feldolgozó egységet.

Na most kezd igazán érdekes lenni a dolog. A generátor esetében tudni kell szabályozni, hogy hány darab fut egyidőben. A legkézenfekvőbb döntés, ha létrehozunk egy chanelt adott méretben, és beleteszünk egy értéket amikor indítunk egy új generátort, és kiveszünk egyet ha végzett a generátor. Ennek eredméyeképpen amikor megtelik a chanel programunk várakozik.

Miután elindítottuk a generáló szálakat szeretnénk megvárni amíg az összes végez. Ennek érdekében bevezetünk egy sync.WaitGroup-ot.

Egy utolsó dolog maradt a generátorral kapcsolatban, hogy szeretnénk az eltelt időt mérni a lehető legközelebb a generáláshoz. Ennek biztosítására én a sync.Once-ot választottam, ami garantálja ezt a működést.

Nincs más dolgunk mint megvárni és kiírni az eredményt.

A teljes forráskód elérhető itt.

Programunk gyönyörűen fut, de vajon mi történik a háttérben? Fordítsuk le a programot a Go beépített verseny helyzet elemzőjével.
go build -race
Ez után futtatva valami hasonló kimenetet kapunk:
==================
WARNING: DATA RACE
Read at 0x00c42000c2e8 by main goroutine:
  main.main()
      /Users/rkovacs/asd/src/asd/main.go:92 +0x731

Previous write at 0x00c42000c2e8 by goroutine 7:
  sync/atomic.AddInt32()
      /usr/local/Cellar/go/1.7.1/libexec/src/runtime/race_amd64.s:269 +0xb
  main.main.func1()
      /Users/rkovacs/asd/src/asd/main.go:37 +0x191

Goroutine 7 (running) created at:
  main.main()
      /Users/rkovacs/asd/src/asd/main.go:42 +0x31b
==================
time spent: 1121ms
last:not ratio: 499:1
Found 1 data race(s)
Futás idejű analízisből kiderült, hogy volt értelme elemi műveletben növelni a számlálót, hiszen verseny helyzet alakult ki, és máskülönben megjósolhatatlan eredménnyel zárult volna a futás.

A téma még nem merült ki ennyivel, de remélem sikerült egy gyors képet adnom a Go konkurencia kezeléséről. Ajánlom továbbá figyelmetekbe a Concurrency is not parallelism és a Go concurrency patterns előadásokat.

2016. szeptember 30., péntek

Üzenet hitelesítése Java és Go szervizek között

Java fejlesztés mellett időm egy részét Go programozással töltöm, és egy olyan feladaton volt szerencsém dolgozni, amely mindkét platformot érintette. Napjaink modern alkalmazásai kisebb szervízekre vannak bontva, és igen gyakori, hogy az egyes szervízek eltérő technológiával kerülnek implementálásra. Konkrét esetben az volt az elvárás, hogy a szervízek közti kommunikációt aláírással hitelesítsem, a küldő fél Javaban, míg a fogadó Goban írodott. Mivel nem valami egzotikus kérést kellett megvalósítani gondoltam másoknak is hasznos lehet a megoldás. Előljáróban még annyit kell tudni a rendszer architektúrájáról, hogy a Java kód indít virtuális gépeket, és az ezeken a gépeken futó Go szolgáltatáson keresztül végez beállítási műveleteket, ráadásul mindkét komponens nyílt forráskódú. Ezen két adottságból adódóan nem volt mód sem szimetrikus titkosítást használni, vagy egyéb más érzékeny adatot eljuttatni a futó virtuális gépre, sem pedig valami közös "trükköt" alkalmazni. Maradt az aszinkron kulcspárral történő aláírás, mi az RSA-t választottuk. Nem is szaporítanám a szót, ugorjunk fejest a kódba.

Kezdjük a fogadó féllel. A Go nyelv dokumentációját olvasva hamar ráakadhatunk, hogy létezik beépített crypto/rsa csomag. Nem bővelkedik a lehetőségekben, ugyanis csak PKCS#1-et támogat. Remélem nem spoiler, de a Go lesz a szűk keresztmetszet választható sztenderdek közül. Létezik persze külső csomag pl. PKCS#8 támogatással, de mi a biztonsági kockázatát kisebbnek ítéltük a beépített bár gyengébb eljárásnak, mint a külső kevesek által auditált megoldásnak. A crypto/rsa csomagnál maradva az egyetlen lehetőségünk, hogy PSS (Probabilistic signature scheme) aláírásokat hitelesítsünk a VerifyPSS metódussal. Szóval nincs más dolgunk mint az RSA kulcspár publikus részét eljuttatni a virtuális gépre, és már mehet is a hitelesítés.


Küldés során a kérés teljes törzsét írtuk alá, így nincs más dolgunk mint a kérésből kibányászni a törzset és ellenőrizni a hitelességét.

Valamint implementálni és regisztrálni a kérés feldolgozót.

Természetesen tesztet is írtam az aláírás ellenőrzésére.

Miután megvagyunk a hitelesítéssel jöhet az aláírás Java oldalon. Kutattam egy darabig hogyan lehet PSS aláírást Java SE-vel generálni, de mivel a projektünknek már része volt a Bouncy Castle Crypto API, így kézenfekvő volt, hogy azt használjam fel.

A Java oldali kulcspár generálással tele van az internet, azzal nem untatnák senkit.

2016. január 23., szombat

Big Data trendek 2016

Remélem nem vagyok még nagyon elkésve egy ilyen témájú bejegyzéssel, csak most kezdődik az év, úgy gondolom belefér egy kis jövendő mondás. Nem kell megijedni, nem a kristály-gömbömből fogok olvasni, hanem a szakma neves képviselőinek jóslatait igyekszem közvetíteni. A Budapest Big Data Meetup idei első találkozója alkalmából a szervezők három embert kértek fel, hogy osszák meg gondolataikat a közösséggel, én pedig szeretném összefoglalni az ott hallottakat. Mielőtt az előadások elkezdődtek volna volt pár köz érdekű információ, melyek közül szerintem a legfontosabb, hogy megalakult a Budapest Spark Meetup. Csatlakozzatok, terjesszétek, stb.

Az első előadást Rátky Gábor tartotta "Climbing the slope of enlightment" címmel. Rövid bemutatkozás után felvázolta, hogy szerinte pontosan a hype ciklus melyik szakaszában jár a "big data", őszintén remélem igaza van Gábornak, mert meglátása alapján már felfelé mozgunk a völgyből, és lassan megtaláljuk helyét a világban a technológiáknak.
A figyelmet érdemlő technológiák közül elsőként az Amazon EMR 4-es verzióját említette meg, ami gyakorlatilag egy Hadoop as a Service (HaaS) szolgáltatás, ahol az infrastruktúrárol az Amazon gondoskodik, a fejlesztőnek csak a feladatok megírásával, futtatásával kell törődnie, meg persze a számla kiegyenlítésével hónap végén. Legfontosabb tulajdonságainak az alábbiakat gyűjtötte össze:
  • Az EMR a legnagyobb Hadoop distribúció piaci részesedés alapján
  • Nagyon jó a többi Amazon szolgáltatással az integráció, mint például EC2, S3 vagy Redshift
  • Apache Bigtop-re épül
  • Könnyen lehet ideiglenes (ephemeral) fürtöket létrehozni és megszüntetni
  • Intelligens skálázhatóság
  • Spark 1.5 támogatással rendelkezik, és gyorsan adoptálják az újabb verziókat
  • Az EMR homokozóban friss technológiákat lehet kipróbálni mint az Apache Zeppelin
  • Igazán azoknak ajánlott, akik egyéb Amazon szolgáltatásokat is használnak, vagy nem okoz problémát elköteleződni egy beszállító felé
Megkerülhetetlen szereplő az Apache Spark, ha 2016-os trendekről beszélünk, Gábor sem hagyhatta figyelmen kívűl. Csapatuk, a Secure Sauce Partners is aktívan használja, dübörögő technológia, sok területen hódít, és nem méltán örvend ekkora népszerűségnek. Egy mondatban összefoglalva a Spark egy olyan keretrendszer, amivel elosztott alkalmazásokat lehet fejleszteni, és megfelelő programozással az elosztott futtatásról a Spark gondoskodik. Első pontnak is ezt emelte ki, de lássuk mi volt a többi:
  • Ugyanaz a kód fut az egy gépestől a sok gépes rendszerig, "Write once, run anywhere"
  • Hatalmas lendülettel fejlődik, köszönhetően a felhajtásnak is ami körülveszi
  • Szaporodnak a Spark primitívekre épülő keretrendszerek, mint például DataFrame, MLLib
Gábor személyes kedvencére tért még ki az előadásában az Apache Zeppelinre, ami egy hiánypótló, web alapú, interaktív adat elemző eszköz. A projekt jelenleg az Apache inkubátorban fejlődik roham léptekkel. Gábor véleménye szerint az egész adat analízis eddig a programozói interfészeken keresztűl történt, történik, és ő reméli, hogy a 2016-os év az ökoszitéma köré épülő felhasználói felületek éve lesz. Ennek a területnek egyik úttörője a Zeppelin, mert:
  • Feladatoknak 80%-a az adat munging, és ezt (is) könnyíti meg a projekt
  • Legjobb alkalmazás csoportban adat-halmazokon dolgozni
  • Egyszerű(bb)en elvégezhető vele az adat-halmazok definiálása, a lépések és az eredmény megosztása
  • REPL-ben jártas adat-elemzőknek otthonos környezet
Végül pár pontban összeszedte mit vár a 2016-os évtől:
  • Konszolidáción mennek keresztül az eszközök, és így az egész ökoszisztéma
  • Az SQL alapú megoldások erősödni fognak
  • A már említett kiegészítő eszközök sokasodnak
  • +1-nek pedig az IoT
A következő előadást a RapidMiner képviseletében Prekopcsák Zoltán tartotta. Sajnos írásos anyag híján csak az emlékezetemre támaszkodhatok, hogy miről is beszélt pontosan.

Téma volt a Spark, aktív felhasználói a keretrendszernek, és két fontos dolgot is mondott ezzel kapcsolatban:
  • Reméli a hiba javítás nagyon nagy fókuszt kap a jövőben, mert véleménye szerint ezen a területen nagyon sok munka van
  • Az a tapasztalata, hogy sok vállalat, akik letették a voksukat a Spark mellett, nem félnek frissíteni a rendszert, és bátran ugranak bele egy-egy újabb verzióba
Zoltán szerint is az SQL technológiák nagyobb térnyerése lesz jellemző az évre, valamint a biztonságot - mind adat, mind rendszer szinten - célzó fejlesztések lesznek a közép pontban.

Harmadikként Mátyás János adott elő a Hortonworks színeiben. A Hortonworks egy teljesen nyílt forrású Hadoop disztribúció, a HDP fejlesztésére és támogatására szakosodott szilícium-völgyi vállalat. A szokásos bemutatkozás után János összefoglalta miről is szólt a 2015-ös év big data szempontból:
  • Az Apache Spark éve volt
  • Az SQL még mindig nagy dolog, miért is dobnánk ki az ablakon több évtizednyi tudást
  • Az adatok mozgatása a Hadoop fürtökbe nagy kérdés volt/lesz, és ez adott lét jogosultságot olyan alkalmazásoknak mint az:
A 2016-os előrejelzések terén rengeteg tényezőt kell figyelembe venni, amit külön nehezít, hogy a HDP több mint 22 Apache projektet ölel körbe, ezért János inkább technológiai szempontból szedte össze az érdekesebb trendeket.

Elsőként az adatok tárolása körüli munkákat emelte ki mint például az Erasure Coding a HDFS fájlrendszerben és az archív adatokat tároló rendszerek támogatása. Mindkettő fejlesztésnek a költség hatékonyság a fő mozgató rugója.

A HDFS az EC bevezetése előtt csak teljes blokk replikációt támogatott, ami azt jelenti, hogy a replikációs faktornak megfelelő darab számban minden adat tárolásra került. Az újításnak köszönhetően alap értelmezetten 6 adat blokkhoz 3 paritás blokk tartozik. Ennek hála tárhelyet spórol az eljárás, és megnöveli az írási sebességet, miközben a hiba tűrés ugyanúgy megmarad. Amíg az EC szoftveresen javítja a tároló kapacitás kihasználását, addig a másik fejlesztés hardveresen igyekszik a költségek minimalizálására. A hozzáférés gyakorisága szerint három csoportba rendezi az adatokat, és a ritkán használt, régi adatokat olcsóbb, de lassabb tárolókra lehet szervezni.

Következőkben az adat trendekről beszélt az előadó, úgy mint:
  • A modern alkalmazásokban az adatok elemzése elsődleges funkció
  • A gyakorlat azt mutatja, hogy a szolgáltatásokat és adat elemző rétegeket együttesen használják
    • Spark + HBase
    • Flume + Storm + Hive + Tableau
  • Alapvető elvárások ezen a területen:
    • Könnyen felhasználható legyen és könnyen lehessen menedzselni
    • Biztonságos legyen
    • Reprodukálható legyen
Ezután az előadó a YARN fontosságát emelte ki, a YARN.next már a hagyományos konténerek mellett Docker konténerek használatát is támogatja, és ezzel lehetővé vált, hogy ne csak Hadoop komponensek futását kezelje a YARN, hanem bármilyen más konténerizált alkalmazásét. Ilyen alkalmazások lehetnek a Tomcat, MySQL, vagy akár saját alkalmazásaink. Ez egy nagyon fontos lépés volt, hiszen ezzel az újítással fürtjeinket nem csak Hadoop műveletekre tudjuk használni, hanem gyakorlatilag a YARN-on keresztűl teljes erőforrás menedzsmentet kapunk.

A YARN után a Spark került elő, "Miért szeretjük a Sparkot a Hortonworksnél" címmel. Nekem talán a legfontosabb üzenet az volt ezzel a témával kapcsolatban, hogy ha már a Spark "YARN ready", és az alkalmazásunk egyébként is kapcsolódik más (Hadoop) szolgáltatásokkal, akkor nyugodtan bízzuk a YARN-ra a Spark erőforrások kiosztását is, mert meghálálja magát a dolog. De mi is jellemzi a Sparkot:
  • Elegáns fejlesztői API
    • DataFrame-k
    • Gépi tanulás
    • SQL
  • Data sience-eknek készült
    • A alkalmazások kiszámíthatóan tudnak skálázódni, és megfelelően lehet őket tagolni
  • Demokratizálja a gépi tanulást
    • A Spark azt csinálja a gépi tanulással a Hadoopon belül, amit a Hive csinált az SQL-el
  • Közösség
    • Széles körben fejlesztik, és nagy érdeklődés övezi, nem csak a fejlesztők irányából
  • Realizálja az adat operációs rendszerek értékét
    • Kulcs fontosságú eszköz a Hadoop érában
A 2016-os évben nagy erőkkel fog zajlani a Spark mélyebb integrációja a Hadoop ökoszisztémával, számtalan fejlesztésre kitért az előadó (a teljesség igénye nélkül):
  • Szorosabb RDD és HDFS együtt működés
  • Dinamikus végrehajtás data-locality alapján
  • Atlas integráció
  • Hbase konnektor
  • NiFi stream-ek
  • Biztonsági fejlesztések
    • SparkSQL Security
    • Wire Encryption
  • Spark fejlesztés és vizualizáció Zeppelinben
Utolsó témakörként a biztonság került még szóba. Erről a témáról nem lehet eleget beszélni, és rengeteg fronton helyt kell állni. Négy főbb projektet emelt ki az előadó, melyek együttes használatával fel lehet adni a leckét a támadónak, és az adatok belső védelme is megoldható.

Az Apache Ranger egy központosított biztonsági keretrendszer, ami az alábbiakról gondoskodik:
  • Authorizáció
  • Authentikálás
  • Audit
  • Adat titkosítás
  • Biztonsági beállítások kezelése
Az Apache Knox a HTTP és REST interfészeket hivatott megvédeni. Egy Hadoop fürt védelmében kulcs szerepet játszik, hogy a tűzfalon minden befelé irányuló porton tiltani kell az adat forgalmat, elzárva a rendszert a külvilágtól. Amennyiben felhasználóink mégis szeretnének a rendszerhez hozzáférni, a Knox egy biztonságos átjárót biztosít külső hálózatok felé.
  • Egyszerűsíti a hozzáférést, mert elfedi a Kerberost és nem szükséges annak telepítése a kliens gépeken
  • Fokozza a biztonságot azáltal, hogy SSL csatornán kommunikál
  • Központosítja a vezérlést, mivel lehet benne irányítani a kéréseket akár több Hadoop fürt felé
  • Nagy vállalati integrációt biztosít LDAP és Active Directory támogatással
Szóba került az előadásban az Apache Atlas projekt is, ami egy integrációs keretrendszer, és nagy vállalati rendszereket segít Hadoop-pal összekötni, valamint az Apache Metron, amely jelenleg még inkubátor projekt, és biztonsági analízist végez, ami alapján értesítéseket tud küldeni.

Az előadása végén János pár szóban beszélt a Hortonworks szárnyai alatt Magyar országon fejlesztett HaaS megoldásról a Cloudbreakről. A támogatott felhő platformok között nem kisebb nevek szerepelnek mint az Amazon AWS, Google GCE, Microsoft Azure, vagy az OpenStack. A Cloudbreak lényegében minimális felhasználói interakcióval gondoskodik az infrastruktúra beüzemelésétől, az Apache Ambari fürt beállításán át, a teljes Hadoop környezet felépítéséig mindenről.

Az előadások végeztével megkezdődött a kötetlen beszélgetés, pizza sör ilyenek :). Sokan részt vettek a meetupon, és jó volt látni, hogy ennyi embert foglalkoztatnak az idei trendek. Ha idáig eljutottál, valószínűleg te is közéjük tartozol, ne habozz megosztani a véleményed!

2015. június 5., péntek

Konténerezett Hadoop és Cassandra cluster konfigurálása - harmadik rész

A sorozat előző részeiben (1, 2) Vagrantos környezetben felépítettünk egy Hadoop clustert. Ebben a befejező cikkben egy Cassandra fürtöt fogunk telepíteni, majd egy map/reduce jobot futtatunk a teljes clusteren. Izgalmasan hangzik, vágjunk is bele.

Előkészítés

Első dolgunk, hogy meglévő projektünket frissítjük a megfelelő verzióra, és építsük újra a konténereket:
cd docker-cassandra && git checkout 2.6.0-cassandra && git pull origin 2.6.0-cassandra
cd ../hadoop-docker && git checkout 2.6.0-cassandra && git pull origin 2.6.0-cassandra
cd .. && git checkout 2.6.0-cassandra && git pull origin 2.6.0-cassandra
Élesszük fel a gépeket:
vagrant halt && vagrant up
Majd a futó Vagrantos környezetben építsük újra a konténereket:
vagrant provision master slave1 slave2 slave3
Lépjünk be a virtuális gépekre, és töröljünk minden futó konténert:
docker rm -f $(docker ps -qa)
Ha mindezzel végeztünk, hozzuk létre ismét a Weave hálózatot és a Swarm clustert, a Hadoop konténereket egyelőre hagyjuk parlagon.

Változások

Csökkentettem a master nevű gép memória igényét, mivel a továbbiakban csak mesterként fog szolgálni a benne futó Hadoop konténer, és ezzel egy időben növeltem a slaveX nevű gépek memóriáját, mert az eddigi beállítás ki volt hegyezve a Hadoopra, mostantól viszont a Cassandrának is kell helyet szorítani. A fejlesztés előrehaladtával a gépemben lévő 8 Gb RAM már sokszor kevésnek bizonyult, elkezdte a Vagrant aktívan használni a swapet, ami igen rossz hatással van a jobok futtatására, lépten nyomon elhasaltak. Én átmenetileg a 3-s számú slave gépet kikapcsoltam. Összességében 4 virtuális gépből és 15 konténerből áll a cluster, szóval személy szerint nem is csodálkozom, hogy ilyen mértékben megnövekedett a gép igény.

bootstrap.sh
if [ -n "$MASTER_IS_SLAVE_TOO" ]; then
    echo $HOST_NAME > $HADOOP_PREFIX/etc/hadoop/slaves
else
    echo "" > $HADOOP_PREFIX/etc/hadoop/slaves
fi
Bevezettem egy környezeti változót (MASTER_IS_SLAVE_TOO) melynek hatására a mester konténer szolga is lesz egyben, a változó nélkül csak mesteri teendőit látja el.

Dockerfile
RUN sed -i "s|^# Extra Java CLASSPATH.*|&\nexport HADOOP_CLASSPATH=/usr/share/cassandra/*:/usr/share/cassandra/lib/*:\$HADOOP_CLASSPATH|" $HADOOP_PREFIX/etc/hadoop/hadoop-env.sh
Javítottam a HADOOP_CLASSPATHon, hiányzott egy Cassandrás függőség.

cassandra-clusternode.sh
if [ -n "$PUBLIC_INTERFACE" ]; then
    IP=$(ifconfig $PUBLIC_INTERFACE | awk '/inet addr/{print substr($2,6)}')
    PUBLIC_IP=$IP
fi
if [ -n "$PUBLIC_IP" ]; then
    sed -i -e "s/^# broadcast_address: 1.2.3.4/broadcast_address: $PUBLIC_IP/" $CASSANDRA_CONFIG/cassandra.yaml
fi
A Cassandrás konténerben egy új környezeti változóval, név szerint PUBLIC_INTERFACE, megoldottam, hogy a Cassandra a megfelelő IP címet használja minden nemű kommunikációhoz.
if [ -n "$CASSANDRA_SEEDS" ]; then
    for a in $(echo $CASSANDRA_SEEDS | sed 's/,/ /g'); do CASSANDRA_SEEDS=$(echo $CASSANDRA_SEEDS | sed "s/$a/$(ping -c1 $a | grep PING | awk '{ print $3 }' | sed "s/(//;s/)//")/"); done
fi
Mivel a konténerek dinamikusan kapnak IP címet, a Cassandra viszont csak IP alapján tud kapcsolódni a seed szerverekhez, ezért meg kellett trükköznöm a CASSANDRA_SEEDS változót, domain neveket és IP címeket is elfogad egyaránt, majd a Cassandra indítása előtt feloldja a domain neveket IP címekre.

Futtatás


slave1
nohup docker -H tcp://192.168.50.15:1234 run --name cassandra-slave1 --dns 192.168.50.15 -h cassandra1.lo -e "PUBLIC_INTERFACE=eth0" -e "CASSANDRA_CLUSTERNAME=HadoopTest" -e "CASSANDRA_TOKEN=-9223372036854775808" -t mhmxs/cassandra-cluster > cassandra.log 2>&1 &
docker -H tcp://192.168.50.15:1234 run --name hadoop-slave1 --dns 192.168.50.15 -h slave1.lo -e "MASTER=master.lo" -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash
A Cassadnrás konténer logját a cassandra.log fájlban találjuk, érdemes a Hadoop cluster elindítása előtt összeállítani a Cassandra clustert (vagy külön terminálban nohup nélkül indítani), mert ha valami időzítési vagy hálózati probléma miatt nem találták meg egymást a nodeok, akkor elég kényelmetlen a Swarm clusterből törölgetni, majd újraindítgatni a megfelelő konténereket. Sokszor kellett a fejlesztés alatt hasonlót csinálnom, és a rendszer sem túl hiba tűrő, úgyhogy rászoktam, hogy minden lépés előtt ellenőrzöm, hogy az elemek a helyükre kerültek-e.

slave2
nohup docker -H tcp://192.168.50.15:1234 run --name cassandra-slave2 --dns 192.168.50.15 -h cassandra2.lo -e "PUBLIC_INTERFACE=eth0" -e "CASSANDRA_CLUSTERNAME=HadoopTest" -e "CASSANDRA_SEEDS=cassandra1.lo" -e "CASSANDRA_TOKEN=-3074457345618258603" -t mhmxs/cassandra-cluster > cassandra.log 2>&1 &
docker -H tcp://192.168.50.15:1234 run --name hadoop-slave2 --dns 192.168.50.15 -h slave2.lo -e "MASTER=master.lo" -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash
slave3
nohup docker -H tcp://192.168.50.15:1234 run --name cassandra-slave3 --dns 192.168.50.15 -h cassandra3.lo -e "PUBLIC_INTERFACE=eth0" -e "CASSANDRA_CLUSTERNAME=HadoopTest" -e "CASSANDRA_SEEDS=cassandra1.lo" -e "CASSANDRA_TOKEN=3074457345618258602" -t mhmxs/cassandra-cluster > cassandra.log 2>&1 &
docker -H tcp://192.168.50.15:1234 run --name hadoop-slave3 --dns 192.168.50.15 -h slave3.lo -e "MASTER=master.lo" -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash
master
docker -H tcp://192.168.50.15:1234 run --name hadoop-master --dns 192.168.50.15 -h master.lo -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -v /vagrant:/vagrant -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash

Job


A hozzuk létre a projekt könyvtárban a KeyCollector.java fájlt az alábbi tartalommal:
import java.io.IOException;
import java.util.*;
import java.nio.ByteBuffer;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.hadoop.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.utils.ByteBufferUtil;

public class KeyCollector {

    public static void main(String[] args) throws IOException {
        if (args.length != 1) {
            System.err.println("Usage: KeyCollector <output path>");
            System.exit(-1);
        }

        JobConf conf = new JobConf(KeyCollector.class);
        conf.setJobName("KeyCollector");

        ConfigHelper.setInputInitialAddress(conf, "cassandra1.lo");
        ConfigHelper.setInputColumnFamily(conf, "HadoopTest", "content");
        ConfigHelper.setInputPartitioner(conf, "org.apache.cassandra.dht.Murmur3Partitioner");
        SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes("text")));
        ConfigHelper.setInputSlicePredicate(conf, predicate);

        conf.setInputFormat(ColumnFamilyInputFormat.class);

        conf.setMapperClass(KeyCollectorMapper.class);

        FileOutputFormat.setOutputPath(conf, new Path(args[0]));

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setReducerClass(KeyCollectorReducer.class);

        JobClient.runJob(conf);
    }

    public static class KeyCollectorMapper extends MapReduceBase implements Mapper<ByteBuffer, Map<ByteBuffer, BufferCell>, Text, IntWritable> {
        public void map(ByteBuffer key, Map<ByteBuffer, BufferCell> columns, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            String textKey = ByteBufferUtil.string(key);
            output.collect(new Text(textKey), new IntWritable(1));
        }
    }

    public static class KeyCollectorReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            output.collect(key, new IntWritable(sum));
        }
    }
}
Valószínűleg ez a világ legértelmetlenebb map/reduce jobja, összegyűjti a column familybe lévő kulcsokat, de ez tűnt a legegyszerűbb implementációnak. Természetesen lett volna lehetőség a job eredményét a Cassandrába tárolni, de a letisztultság jegyében, én a fájl rendszert preferáltam. Fordítsuk le az osztályt és csomagoljuk be egy jar-ba a mester Hadoop konténerben.
yum install -y java-1.8.0-openjdk-devel
cd /vagrant
mkdir build
classpath=.
for jar in /usr/share/cassandra/*.jar; do classpath=$classpath:$jar; done
for jar in /usr/share/cassandra/lib/*.jar; do classpath=$classpath:$jar; done
for jar in `find /usr/local/hadoop/share/hadoop/ *.jar`; do classpath=$classpath:$jar; done
javac -classpath $classpath -d build KeyCollector.java
jar -cvf KeyCollector.jar -C build/ .
Következő lépés, hogy ellenőrizzük a cluster működését, és teszt adattal töltjük fel az adatbázist, szintén a mester konténerből.
nodetool -h cassandra1.lo status
cassandra-cli -h casandra1.lo
create keyspace HadoopTest with strategy_options = {replication_factor:2} and placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy';
use HadoopTest;
create column family content with comparator = UTF8Type and key_validation_class = UTF8Type and default_validation_class = UTF8Type and column_metadata = [ {column_name: text, validation_class:UTF8Type} ];
set content['apple']['text'] = 'apple apple red apple bumm';
set content['pear']['text'] = 'pear pear yellow pear bumm';
Elérkezett a várv várt pillanat, futtathatjuk a jobot.
$HADOOP_PREFIX/bin/hadoop jar KeyCollector.jar KeyCollector output
$HADOOP_PREFIX/bin/hdfs dfs -cat output/*
A kimenetben láthatjuk, hogy egy darab apple és egy darab pear kulcs van az adatbázisban.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/cassandra/lib/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
apple: 1
pear: 1

Teljesítmény


Befejezésül ejtsünk pár szót a rendszer teljesítmény optimalizálásáról.

A Cassandra elég korai verziójában bevezetésre kerültek az un. vNodeok, amik azt biztosítják, hogy egy Cassandra valós node a token tartomány több szeletét is birtokolhassa egyszerre. A Cassandra oldaláról számos előnye van ennek a megoldásnak, viszont a Hadoop felöl érkezve kifejezetten káros hatása van. A vNodeok száma kihatással van a szeletek (split) számára, ami annyit tesz, hogy annyi darab szelet egészen biztos lesz, ahány vNode van engedélyezve.

A következő paraméter, amire érdemes figyelni, az a szeletek mérete. Az alapértelmezett szelet mérete 64K, ha mondjuk van 5,000,000,000 sor az adatbázisban, akkor 64,000-el számolva 78,125 szeletben fogja elvégezni a műveletet a Hadoop, ami szeletenkénti 10 másodperccel szorozva száz óra körüli futás időt eredményez. Az alábbi sorral konfigurálhatjuk a szeletek méretét.
ConfigHelper.setInputSplitSize(job.getConfiguration(), 10000000);
Mivel a feldolgozó egységekben korlátozott mennyiségű memória áll rendelkezésre, javasolt finom-hangolni az egyszerre feldolgozott adatok számát, a példában 100-as kötegekben fogja a CQL driver a sorokat prezentálni, az alapértelmezett 1000 darab helyett.
CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), Integer.toString(100));
A negyedik megkerülhetetlen téma az un. data locality. A jó teljesítmény eléréséhez elengedhetetlen, hogy az adatokat a legkisebb mértékben kelljen mozgatni. A Cassandra fejlesztői az InputFormat implementálása során szerencsére gondoltak erre a probléma forrásra, annyi a dolgunk, hogy ugyanarra a hostra telepítjük a Cassandrát, és a Hadoop JobTrackert. Sajnos Docker konténereket használva ezt bukjuk, mert a két konténer mindig két különálló host lesz, gyakorlatilag mintha külön gépen futnának a szolgáltatások. Megtehetnénk, hogy összeházasítjuk a két konténert, de akkor a Docker szemlélettel mennénk szembe, miszerint minden szolgáltatás egy önálló konténer legyen. Ha ezt el szeretnénk kerülni, akkor bizony be kell koszolnunk a kezünket. Én első lépésben megnyitottam a legfrissebb dokumentációt. Láthatjuk, hogy semmi nem található benne a témával kapcsolatban, de ne keseredjünk el, ugyanis van egy őse ennek a dokumentumnak, nyissuk meg a AbstractColumnFamilyInputFormat fájlt is. A public List<InputSplit> getSplits(JobContext context) metódusban történik a csoda, itt állítja össze a Hadoop a szeleteket, minden egyes szeletet egy InputSplit objektum reprezentál, és tartalmazza azt/azokat a host neveket, ahol az adat megtalálható.

A jelen cluster egy szoftveres hálózaton kommunikál, így ebben az esetben nincs jelentősége az adatok tényleges helyének, nem is bonyolítanám tovább a rendszert saját InputFormat írásával és/vagy különböző DNS trükkök bevezetésével. A jó megoldást mindenféleképpen az adott hálózati architektúrának megfelelően kell kialakítani. Akit érdekelnek további a teljesítménnyel kapcsolatos kérdések itt talál pár választ.

2015. május 25., hétfő

Konténerezett Hadoop és Cassandra cluster konfigurálása - második rész

Az előző részben sikeresen elindítottuk a Hadoop clustert, annyi szépség hibával, hogy a Docker konténerek a Vagrantos gépek IP címeire telepedtek --net=host kapcsolóval, ami bizonyos körülmények között elfogadható, de mindenképpen szeretném elkerülni a host IP-k használatát. A most következő részben három dologgal fogom felturbózni az infrastruktúrát. Szeretném a gépeket Swarm clusterben összefogni, hogy DNS segítségével találják meg egymást, egy privát hálózaton. A Swarm a Docker Engine képességeit terjeszti ki egy egész gép csoportra, lényegében a megszokott szolgáltatásokat kapjuk, csak nem egy gépre levetítve, hanem egy egész fürtre. A Weave gondoskodik a konténer közi hálózatról, és bár rendelkezik saját DNS megoldással, számomra a Docker-spy szimpatikusabb választásnak tűnt, mert egyszerűbb telepíteni és üzemeltetni.

Előkészítés

Első dolgunk, hogy meglévő projektünket frissítjük a megfelelő verzióra:
cd hadoop-docker && git checkout 2.6.0-dns && git pull origin 2.6.0-dns
cd .. && git checkout 2.6.0-dns && git pull origin 2.6.0-dns
A Docker Swarmot és egyéb szükséges szolgáltatásokat egy újabb virtuális gépre telepítettem, kénytelen voltam a hálózati címeket megváltoztatni, mert a Swarm menedzser nem volt hajlandó az 1-es végű gépre csatlakozni, kitöröltem a hostnév konfigurációt, hiszen többé nem lesz rá szükség, illetve a Weave hálózathoz szükséges újabb IP-ket konfiguráltam:
config.vm.define "swarm" do |sw|
 sw.vm.network "private_network", ip: "192.168.50.15"

 sw.vm.provision :shell, inline: "sh /vagrant/swarm.sh 10.2.0.15/16 10.2.15.0/24"

 config.vm.provider :virtualbox do |vb|
  vb.customize ["modifyvm", :id, "--memory", "512"]
 end
end
Élesszük fel a gépeket:
vagrant halt && vagrant up
Majd a futó Vagrantos környezetben építsük újra a konténereket illetve eszközöljük az egyéb változtatásokat:
vagrant provision master slave1 slave2 slave3
Lépjünk be a virtuális gépekre, és töröljünk minden futó konténert:
docker rm -f $(docker ps -qa)

Változások

Mielőtt nekilátnánk a szolgáltatások elindításához vegyük sorra pontosan miket kellett átalakítani a kódban.

common.sh
sed -i "s/exit 0//" /etc/rc.local
if [ -z "$(cat /etc/rc.local | grep 'docker restart')" ]; then
        echo "service docker restart" >> /etc/rc.local
fi
Biztos ami biztos a Dockert kényszerítem, hogy elinduljon.
cat > /etc/network/interfaces.d/weave.cfg << EOF
auto weave
iface weave inet static
        address $(echo $1 | sed "s|/.*||")
        network 10.2.0.0
        netmask 255.255.0.0
        broadcast 0.0.0.0
        bridge_ports none
        bridge_maxwaitout 0
EOF
Létrehoztam egy hálózati hidat a Weave számára az első paraméterben (pl. 10.2.0.15/16) megadott IP címmel.
apt-get update && apt-get -y install bridge-utils curl docker.io
echo "" > /etc/default/docker
service docker restart
Telepítem a bridge-utils csomagot ami elengedhetetlen a perzisztens hálózati hídhoz, majd kényszerítem a Dockert, hogy alap beállításokkal induljon el.
if [ ! -f /usr/local/bin/weave ]; then
    wget -O /usr/local/bin/weave https://github.com/weaveworks/weave/releases/download/latest_release/weave
    chmod a+x /usr/local/bin/weave
fi
if [ -z "$(ifconfig | grep weave)" ]; then
    weave create-bridge
    ip addr add dev weave $1
fi
Telepítem a Weavet, és mivel az előzőekben definiált hálózati híd a következő újraindítás során válik csak perzisztenssé, ezért most az egyszer kézzel létrehozom azt a Weave beépített eszközével.
echo "DOCKER_OPTS='-H tcp://$(ifconfig eth1 | awk '/inet addr/{print substr($2,6)}'):2375 -H unix:///var/run/docker.sock --bridge=weave --fixed-cidr=$2'" > /etc/default/docker
service docker restart
Az eth1 nevű interfésszel összekapcsolom a Dockert, így az kívülről is elérhetővé válik, továbbra is használatban marad a Unix socket, definiálom a hidat mint hálózati csatolót, és garantálom, hogy a Docker minden konténernek teljesen egyedi IP-t adjon cluster szerte a második paraméterben (pl. 10.2.15.0/24) megadott IP tartománnyal.

A konténer konfigurációjában is kellett kicsit piszkálni, a bootstrap.sh-ban lecseréltem a statikus IP konfigurációt host nevekre:
if [ -z "$MASTER" ]; then
   HOST_NAME=$(hostname)
   echo $HOST_NAME > $HADOOP_PREFIX/etc/hadoop/masters
   echo $HOST_NAME > $HADOOP_PREFIX/etc/hadoop/slaves
   if [ -n "$SLAVES" ]; then
      echo "$SLAVES" | sed -e "s/,/\n/g" >> $HADOOP_PREFIX/etc/hadoop/slaves
   fi
 
   for a in `ls $HADOOP_PREFIX/etc/hadoop/*site.xml`; do sed -i "s/master/$HOST_NAME/g" $a; done
else
   if [ -n "$MASTER" ]; then
      for a in `ls $HADOOP_PREFIX/etc/hadoop/*site.xml`; do sed -i "s/master/$MASTER/g" $a; done
   fi
fi
Többé nem a mesterrel kell környezeti változóval tudatni, hogy mester módban induljon el, hanem a szolgáknak kell megadni a mester host nevét. A mester kicseréli a *.site.xml konfigurációs állományokban a master nevet a saját host nevére, a szolgák pedig a megadott névre.

Futtatás

A rendszer automatizálása eddig a pontig tartott, az egyes komponenseket már kézzel kell elindítani. A swarm nevű gépen indítsuk el a Weave hálózatot, hozzuk létre a Swarm cluster és indítsuk el a menedzsert:
weave launch
docker run --rm swarm create > /vagrant/currenttoken
docker run -d -p 1234:2375 swarm manage token://$(cat /vagrant/currenttoken)
Majd a többi gépen csatlakozzunk a hálózathoz és a clusterhez:
weave launch 192.168.50.15
docker run -d swarm join --addr=$(ifconfig eth1 | awk '/inet addr/{print substr($2,6)}'):2375 token://$(cat /vagrant/currenttoken)
Ellenőrizzük, hogy a Swarm cluster megfelelően jött-e létre:
docker -H tcp://192.168.50.15:1234 info
Látnunk kell, hogy 3 gépből áll a fürt (ne essünk kétségbe, ha rögtön nem jelennek meg a gépek, van egy kis késleltetése a rendszernek). A feladat első részével végeztünk is, ha megfelelően paraméterezve elindítjuk a konténereket, akkor a hálózaton látni fogják egymást. Nagyobb cluster méret esetén érdemes még un. Service discovery réteggel kibővíteni  a rendszert. Támogatott implementáció akad bőven Etcd, Consul, Zookeeper, nem szeretnék a részletekbe belemenni, mindenki használja a saját preferáltját. A SD-nek két fontos szerepe van: az egyik, hogy a Swarm menedzser azon keresztül találja meg a gépeket a clusterben, a másik, hogy a gépek egymást is azon keresztül találják meg a hálózaton. A jelenlegi gép méret az első opciót nem igazán indokolja, DNS szolgáltatásnak pedig mondhatni ágyúval verébre esete áll fenn, így valami egyszerűbb megoldást választottam. Számtalan DNS projekt érhető el Docker ökoszisztémában, a legtöbb arra alapozza a működését, hogy listenereket aggat a Docker registryre és elcsípik amikor egy konténer létrejön vagy megszűnik. Ahogy említettem és a Docker-spyt választottam, és ennek megfelelően indítsuk is el azt a swarm nevű gépen.
docker run --name docker-spy -e "DOCKER_HOST=tcp://192.168.50.15:1234" -e "DNS_DOMAIN=lo" -p 53:53/udp -p 53:53 -v /var/run/docker.sock:/var/run/docker.sock iverberk/docker-spy
Ez a konténer nem csatlakozik a Swarm clusterhez, viszont a cluster eseményeire reagál, és az lo végződésű host neveket regisztrálja a memóriában, a többi kérést pedig a Google népszerű név feloldó szervere felé továbbítja, így nem kell aggódnunk, hogy csak a saját neveinket oldja fel.

Nincs más dolgunk, mint elindítani a Hadoop konténereket.

slave1
docker -H tcp://192.168.50.15:1234 run --name hadoop-slave1 --dns 192.168.50.15 -h slave1.lo -e "MASTER=master.lo" -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash
slave2
docker -H tcp://192.168.50.15:1234 run --name hadoop-slave2 --dns 192.168.50.15 -h slave2.lo -e "MASTER=master.lo" -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash
slave3
docker -H tcp://192.168.50.15:1234 run --name hadoop-slave3 --dns 192.168.50.15 -h slave3.lo -e "MASTER=master.lo" -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash
master
docker -H tcp://192.168.50.15:1234 run --name hadoop-master --dns 192.168.50.15 -h master.lo -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash
Ha mindent jól csináltunk, akkor a docker-spy konténer logjában látnunk kell, ahogyan regisztrálja a host neveket, kedvünre futtathatjuk a nekünk tetsző map/reduce jobot.

A következő részben egy Cassandra cluster beüzemelésének lépéseit tervezem bemutatni, némi teljesítmény hangolással, és egy map/reduce job futtatásával.