Előkészítés
Első dolgunk, hogy meglévő projektünket frissítjük a megfelelő verzióra, és építsük újra a konténereket:cd docker-cassandra && git checkout 2.6.0-cassandra && git pull origin 2.6.0-cassandra cd ../hadoop-docker && git checkout 2.6.0-cassandra && git pull origin 2.6.0-cassandra cd .. && git checkout 2.6.0-cassandra && git pull origin 2.6.0-cassandraÉlesszük fel a gépeket:
vagrant halt && vagrant upMajd a futó Vagrantos környezetben építsük újra a konténereket:
vagrant provision master slave1 slave2 slave3Lépjünk be a virtuális gépekre, és töröljünk minden futó konténert:
docker rm -f $(docker ps -qa)Ha mindezzel végeztünk, hozzuk létre ismét a Weave hálózatot és a Swarm clustert, a Hadoop konténereket egyelőre hagyjuk parlagon.
Változások
Csökkentettem a master nevű gép memória igényét, mivel a továbbiakban csak mesterként fog szolgálni a benne futó Hadoop konténer, és ezzel egy időben növeltem a slaveX nevű gépek memóriáját, mert az eddigi beállítás ki volt hegyezve a Hadoopra, mostantól viszont a Cassandrának is kell helyet szorítani. A fejlesztés előrehaladtával a gépemben lévő 8 Gb RAM már sokszor kevésnek bizonyult, elkezdte a Vagrant aktívan használni a swapet, ami igen rossz hatással van a jobok futtatására, lépten nyomon elhasaltak. Én átmenetileg a 3-s számú slave gépet kikapcsoltam. Összességében 4 virtuális gépből és 15 konténerből áll a cluster, szóval személy szerint nem is csodálkozom, hogy ilyen mértékben megnövekedett a gép igény.bootstrap.sh
if [ -n "$MASTER_IS_SLAVE_TOO" ]; then echo $HOST_NAME > $HADOOP_PREFIX/etc/hadoop/slaves else echo "" > $HADOOP_PREFIX/etc/hadoop/slaves fiBevezettem egy környezeti változót (MASTER_IS_SLAVE_TOO) melynek hatására a mester konténer szolga is lesz egyben, a változó nélkül csak mesteri teendőit látja el.
Dockerfile
RUN sed -i "s|^# Extra Java CLASSPATH.*|&\nexport HADOOP_CLASSPATH=/usr/share/cassandra/*:/usr/share/cassandra/lib/*:\$HADOOP_CLASSPATH|" $HADOOP_PREFIX/etc/hadoop/hadoop-env.shJavítottam a HADOOP_CLASSPATHon, hiányzott egy Cassandrás függőség.
cassandra-clusternode.sh
if [ -n "$PUBLIC_INTERFACE" ]; then IP=$(ifconfig $PUBLIC_INTERFACE | awk '/inet addr/{print substr($2,6)}') PUBLIC_IP=$IP fi if [ -n "$PUBLIC_IP" ]; then sed -i -e "s/^# broadcast_address: 1.2.3.4/broadcast_address: $PUBLIC_IP/" $CASSANDRA_CONFIG/cassandra.yaml fiA Cassandrás konténerben egy új környezeti változóval, név szerint PUBLIC_INTERFACE, megoldottam, hogy a Cassandra a megfelelő IP címet használja minden nemű kommunikációhoz.
if [ -n "$CASSANDRA_SEEDS" ]; then for a in $(echo $CASSANDRA_SEEDS | sed 's/,/ /g'); do CASSANDRA_SEEDS=$(echo $CASSANDRA_SEEDS | sed "s/$a/$(ping -c1 $a | grep PING | awk '{ print $3 }' | sed "s/(//;s/)//")/"); done fiMivel a konténerek dinamikusan kapnak IP címet, a Cassandra viszont csak IP alapján tud kapcsolódni a seed szerverekhez, ezért meg kellett trükköznöm a CASSANDRA_SEEDS változót, domain neveket és IP címeket is elfogad egyaránt, majd a Cassandra indítása előtt feloldja a domain neveket IP címekre.
Futtatás
slave1
nohup docker -H tcp://192.168.50.15:1234 run --name cassandra-slave1 --dns 192.168.50.15 -h cassandra1.lo -e "PUBLIC_INTERFACE=eth0" -e "CASSANDRA_CLUSTERNAME=HadoopTest" -e "CASSANDRA_TOKEN=-9223372036854775808" -t mhmxs/cassandra-cluster > cassandra.log 2>&1 & docker -H tcp://192.168.50.15:1234 run --name hadoop-slave1 --dns 192.168.50.15 -h slave1.lo -e "MASTER=master.lo" -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bashA Cassadnrás konténer logját a cassandra.log fájlban találjuk, érdemes a Hadoop cluster elindítása előtt összeállítani a Cassandra clustert (vagy külön terminálban nohup nélkül indítani), mert ha valami időzítési vagy hálózati probléma miatt nem találták meg egymást a nodeok, akkor elég kényelmetlen a Swarm clusterből törölgetni, majd újraindítgatni a megfelelő konténereket. Sokszor kellett a fejlesztés alatt hasonlót csinálnom, és a rendszer sem túl hiba tűrő, úgyhogy rászoktam, hogy minden lépés előtt ellenőrzöm, hogy az elemek a helyükre kerültek-e.
slave2
nohup docker -H tcp://192.168.50.15:1234 run --name cassandra-slave2 --dns 192.168.50.15 -h cassandra2.lo -e "PUBLIC_INTERFACE=eth0" -e "CASSANDRA_CLUSTERNAME=HadoopTest" -e "CASSANDRA_SEEDS=cassandra1.lo" -e "CASSANDRA_TOKEN=-3074457345618258603" -t mhmxs/cassandra-cluster > cassandra.log 2>&1 & docker -H tcp://192.168.50.15:1234 run --name hadoop-slave2 --dns 192.168.50.15 -h slave2.lo -e "MASTER=master.lo" -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bashslave3
nohup docker -H tcp://192.168.50.15:1234 run --name cassandra-slave3 --dns 192.168.50.15 -h cassandra3.lo -e "PUBLIC_INTERFACE=eth0" -e "CASSANDRA_CLUSTERNAME=HadoopTest" -e "CASSANDRA_SEEDS=cassandra1.lo" -e "CASSANDRA_TOKEN=3074457345618258602" -t mhmxs/cassandra-cluster > cassandra.log 2>&1 & docker -H tcp://192.168.50.15:1234 run --name hadoop-slave3 --dns 192.168.50.15 -h slave3.lo -e "MASTER=master.lo" -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bashmaster
docker -H tcp://192.168.50.15:1234 run --name hadoop-master --dns 192.168.50.15 -h master.lo -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -v /vagrant:/vagrant -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash
Job
A hozzuk létre a projekt könyvtárban a KeyCollector.java fájlt az alábbi tartalommal:
import java.io.IOException; import java.util.*; import java.nio.ByteBuffer; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.hadoop.*; import org.apache.cassandra.db.*; import org.apache.cassandra.utils.ByteBufferUtil; public class KeyCollector { public static void main(String[] args) throws IOException { if (args.length != 1) { System.err.println("Usage: KeyCollector <output path>"); System.exit(-1); } JobConf conf = new JobConf(KeyCollector.class); conf.setJobName("KeyCollector"); ConfigHelper.setInputInitialAddress(conf, "cassandra1.lo"); ConfigHelper.setInputColumnFamily(conf, "HadoopTest", "content"); ConfigHelper.setInputPartitioner(conf, "org.apache.cassandra.dht.Murmur3Partitioner"); SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes("text"))); ConfigHelper.setInputSlicePredicate(conf, predicate); conf.setInputFormat(ColumnFamilyInputFormat.class); conf.setMapperClass(KeyCollectorMapper.class); FileOutputFormat.setOutputPath(conf, new Path(args[0])); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setReducerClass(KeyCollectorReducer.class); JobClient.runJob(conf); } public static class KeyCollectorMapper extends MapReduceBase implements Mapper<ByteBuffer, Map<ByteBuffer, BufferCell>, Text, IntWritable> { public void map(ByteBuffer key, Map<ByteBuffer, BufferCell> columns, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String textKey = ByteBufferUtil.string(key); output.collect(new Text(textKey), new IntWritable(1)); } } public static class KeyCollectorReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } }Valószínűleg ez a világ legértelmetlenebb map/reduce jobja, összegyűjti a column familybe lévő kulcsokat, de ez tűnt a legegyszerűbb implementációnak. Természetesen lett volna lehetőség a job eredményét a Cassandrába tárolni, de a letisztultság jegyében, én a fájl rendszert preferáltam. Fordítsuk le az osztályt és csomagoljuk be egy jar-ba a mester Hadoop konténerben.
yum install -y java-1.8.0-openjdk-devel cd /vagrant mkdir build classpath=. for jar in /usr/share/cassandra/*.jar; do classpath=$classpath:$jar; done for jar in /usr/share/cassandra/lib/*.jar; do classpath=$classpath:$jar; done for jar in `find /usr/local/hadoop/share/hadoop/ *.jar`; do classpath=$classpath:$jar; done javac -classpath $classpath -d build KeyCollector.java jar -cvf KeyCollector.jar -C build/ .Következő lépés, hogy ellenőrizzük a cluster működését, és teszt adattal töltjük fel az adatbázist, szintén a mester konténerből.
nodetool -h cassandra1.lo status cassandra-cli -h casandra1.lo create keyspace HadoopTest with strategy_options = {replication_factor:2} and placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy'; use HadoopTest; create column family content with comparator = UTF8Type and key_validation_class = UTF8Type and default_validation_class = UTF8Type and column_metadata = [ {column_name: text, validation_class:UTF8Type} ]; set content['apple']['text'] = 'apple apple red apple bumm'; set content['pear']['text'] = 'pear pear yellow pear bumm';Elérkezett a várv várt pillanat, futtathatjuk a jobot.
$HADOOP_PREFIX/bin/hadoop jar KeyCollector.jar KeyCollector output $HADOOP_PREFIX/bin/hdfs dfs -cat output/*A kimenetben láthatjuk, hogy egy darab apple és egy darab pear kulcs van az adatbázisban.
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/local/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/share/cassandra/lib/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] apple: 1 pear: 1
Teljesítmény
Befejezésül ejtsünk pár szót a rendszer teljesítmény optimalizálásáról.
A Cassandra elég korai verziójában bevezetésre kerültek az un. vNodeok, amik azt biztosítják, hogy egy Cassandra valós node a token tartomány több szeletét is birtokolhassa egyszerre. A Cassandra oldaláról számos előnye van ennek a megoldásnak, viszont a Hadoop felöl érkezve kifejezetten káros hatása van. A vNodeok száma kihatással van a szeletek (split) számára, ami annyit tesz, hogy annyi darab szelet egészen biztos lesz, ahány vNode van engedélyezve.
A következő paraméter, amire érdemes figyelni, az a szeletek mérete. Az alapértelmezett szelet mérete 64K, ha mondjuk van 5,000,000,000 sor az adatbázisban, akkor 64,000-el számolva 78,125 szeletben fogja elvégezni a műveletet a Hadoop, ami szeletenkénti 10 másodperccel szorozva száz óra körüli futás időt eredményez. Az alábbi sorral konfigurálhatjuk a szeletek méretét.
ConfigHelper.setInputSplitSize(job.getConfiguration(), 10000000);Mivel a feldolgozó egységekben korlátozott mennyiségű memória áll rendelkezésre, javasolt finom-hangolni az egyszerre feldolgozott adatok számát, a példában 100-as kötegekben fogja a CQL driver a sorokat prezentálni, az alapértelmezett 1000 darab helyett.
CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), Integer.toString(100));A negyedik megkerülhetetlen téma az un. data locality. A jó teljesítmény eléréséhez elengedhetetlen, hogy az adatokat a legkisebb mértékben kelljen mozgatni. A Cassandra fejlesztői az InputFormat implementálása során szerencsére gondoltak erre a probléma forrásra, annyi a dolgunk, hogy ugyanarra a hostra telepítjük a Cassandrát, és a Hadoop JobTrackert. Sajnos Docker konténereket használva ezt bukjuk, mert a két konténer mindig két különálló host lesz, gyakorlatilag mintha külön gépen futnának a szolgáltatások. Megtehetnénk, hogy összeházasítjuk a két konténert, de akkor a Docker szemlélettel mennénk szembe, miszerint minden szolgáltatás egy önálló konténer legyen. Ha ezt el szeretnénk kerülni, akkor bizony be kell koszolnunk a kezünket. Én első lépésben megnyitottam a legfrissebb dokumentációt. Láthatjuk, hogy semmi nem található benne a témával kapcsolatban, de ne keseredjünk el, ugyanis van egy őse ennek a dokumentumnak, nyissuk meg a AbstractColumnFamilyInputFormat fájlt is. A public List<InputSplit> getSplits(JobContext context) metódusban történik a csoda, itt állítja össze a Hadoop a szeleteket, minden egyes szeletet egy InputSplit objektum reprezentál, és tartalmazza azt/azokat a host neveket, ahol az adat megtalálható.
A jelen cluster egy szoftveres hálózaton kommunikál, így ebben az esetben nincs jelentősége az adatok tényleges helyének, nem is bonyolítanám tovább a rendszert saját InputFormat írásával és/vagy különböző DNS trükkök bevezetésével. A jó megoldást mindenféleképpen az adott hálózati architektúrának megfelelően kell kialakítani. Akit érdekelnek további a teljesítménnyel kapcsolatos kérdések itt talál pár választ.