2015. június 5., péntek

Konténerezett Hadoop és Cassandra cluster konfigurálása - harmadik rész

A sorozat előző részeiben (1, 2) Vagrantos környezetben felépítettünk egy Hadoop clustert. Ebben a befejező cikkben egy Cassandra fürtöt fogunk telepíteni, majd egy map/reduce jobot futtatunk a teljes clusteren. Izgalmasan hangzik, vágjunk is bele.

Előkészítés

Első dolgunk, hogy meglévő projektünket frissítjük a megfelelő verzióra, és építsük újra a konténereket:
cd docker-cassandra && git checkout 2.6.0-cassandra && git pull origin 2.6.0-cassandra
cd ../hadoop-docker && git checkout 2.6.0-cassandra && git pull origin 2.6.0-cassandra
cd .. && git checkout 2.6.0-cassandra && git pull origin 2.6.0-cassandra
Élesszük fel a gépeket:
vagrant halt && vagrant up
Majd a futó Vagrantos környezetben építsük újra a konténereket:
vagrant provision master slave1 slave2 slave3
Lépjünk be a virtuális gépekre, és töröljünk minden futó konténert:
docker rm -f $(docker ps -qa)
Ha mindezzel végeztünk, hozzuk létre ismét a Weave hálózatot és a Swarm clustert, a Hadoop konténereket egyelőre hagyjuk parlagon.

Változások

Csökkentettem a master nevű gép memória igényét, mivel a továbbiakban csak mesterként fog szolgálni a benne futó Hadoop konténer, és ezzel egy időben növeltem a slaveX nevű gépek memóriáját, mert az eddigi beállítás ki volt hegyezve a Hadoopra, mostantól viszont a Cassandrának is kell helyet szorítani. A fejlesztés előrehaladtával a gépemben lévő 8 Gb RAM már sokszor kevésnek bizonyult, elkezdte a Vagrant aktívan használni a swapet, ami igen rossz hatással van a jobok futtatására, lépten nyomon elhasaltak. Én átmenetileg a 3-s számú slave gépet kikapcsoltam. Összességében 4 virtuális gépből és 15 konténerből áll a cluster, szóval személy szerint nem is csodálkozom, hogy ilyen mértékben megnövekedett a gép igény.

bootstrap.sh
if [ -n "$MASTER_IS_SLAVE_TOO" ]; then
    echo $HOST_NAME > $HADOOP_PREFIX/etc/hadoop/slaves
else
    echo "" > $HADOOP_PREFIX/etc/hadoop/slaves
fi
Bevezettem egy környezeti változót (MASTER_IS_SLAVE_TOO) melynek hatására a mester konténer szolga is lesz egyben, a változó nélkül csak mesteri teendőit látja el.

Dockerfile
RUN sed -i "s|^# Extra Java CLASSPATH.*|&\nexport HADOOP_CLASSPATH=/usr/share/cassandra/*:/usr/share/cassandra/lib/*:\$HADOOP_CLASSPATH|" $HADOOP_PREFIX/etc/hadoop/hadoop-env.sh
Javítottam a HADOOP_CLASSPATHon, hiányzott egy Cassandrás függőség.

cassandra-clusternode.sh
if [ -n "$PUBLIC_INTERFACE" ]; then
    IP=$(ifconfig $PUBLIC_INTERFACE | awk '/inet addr/{print substr($2,6)}')
    PUBLIC_IP=$IP
fi
if [ -n "$PUBLIC_IP" ]; then
    sed -i -e "s/^# broadcast_address: 1.2.3.4/broadcast_address: $PUBLIC_IP/" $CASSANDRA_CONFIG/cassandra.yaml
fi
A Cassandrás konténerben egy új környezeti változóval, név szerint PUBLIC_INTERFACE, megoldottam, hogy a Cassandra a megfelelő IP címet használja minden nemű kommunikációhoz.
if [ -n "$CASSANDRA_SEEDS" ]; then
    for a in $(echo $CASSANDRA_SEEDS | sed 's/,/ /g'); do CASSANDRA_SEEDS=$(echo $CASSANDRA_SEEDS | sed "s/$a/$(ping -c1 $a | grep PING | awk '{ print $3 }' | sed "s/(//;s/)//")/"); done
fi
Mivel a konténerek dinamikusan kapnak IP címet, a Cassandra viszont csak IP alapján tud kapcsolódni a seed szerverekhez, ezért meg kellett trükköznöm a CASSANDRA_SEEDS változót, domain neveket és IP címeket is elfogad egyaránt, majd a Cassandra indítása előtt feloldja a domain neveket IP címekre.

Futtatás


slave1
nohup docker -H tcp://192.168.50.15:1234 run --name cassandra-slave1 --dns 192.168.50.15 -h cassandra1.lo -e "PUBLIC_INTERFACE=eth0" -e "CASSANDRA_CLUSTERNAME=HadoopTest" -e "CASSANDRA_TOKEN=-9223372036854775808" -t mhmxs/cassandra-cluster > cassandra.log 2>&1 &
docker -H tcp://192.168.50.15:1234 run --name hadoop-slave1 --dns 192.168.50.15 -h slave1.lo -e "MASTER=master.lo" -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash
A Cassadnrás konténer logját a cassandra.log fájlban találjuk, érdemes a Hadoop cluster elindítása előtt összeállítani a Cassandra clustert (vagy külön terminálban nohup nélkül indítani), mert ha valami időzítési vagy hálózati probléma miatt nem találták meg egymást a nodeok, akkor elég kényelmetlen a Swarm clusterből törölgetni, majd újraindítgatni a megfelelő konténereket. Sokszor kellett a fejlesztés alatt hasonlót csinálnom, és a rendszer sem túl hiba tűrő, úgyhogy rászoktam, hogy minden lépés előtt ellenőrzöm, hogy az elemek a helyükre kerültek-e.

slave2
nohup docker -H tcp://192.168.50.15:1234 run --name cassandra-slave2 --dns 192.168.50.15 -h cassandra2.lo -e "PUBLIC_INTERFACE=eth0" -e "CASSANDRA_CLUSTERNAME=HadoopTest" -e "CASSANDRA_SEEDS=cassandra1.lo" -e "CASSANDRA_TOKEN=-3074457345618258603" -t mhmxs/cassandra-cluster > cassandra.log 2>&1 &
docker -H tcp://192.168.50.15:1234 run --name hadoop-slave2 --dns 192.168.50.15 -h slave2.lo -e "MASTER=master.lo" -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash
slave3
nohup docker -H tcp://192.168.50.15:1234 run --name cassandra-slave3 --dns 192.168.50.15 -h cassandra3.lo -e "PUBLIC_INTERFACE=eth0" -e "CASSANDRA_CLUSTERNAME=HadoopTest" -e "CASSANDRA_SEEDS=cassandra1.lo" -e "CASSANDRA_TOKEN=3074457345618258602" -t mhmxs/cassandra-cluster > cassandra.log 2>&1 &
docker -H tcp://192.168.50.15:1234 run --name hadoop-slave3 --dns 192.168.50.15 -h slave3.lo -e "MASTER=master.lo" -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash
master
docker -H tcp://192.168.50.15:1234 run --name hadoop-master --dns 192.168.50.15 -h master.lo -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -v /vagrant:/vagrant -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash

Job


A hozzuk létre a projekt könyvtárban a KeyCollector.java fájlt az alábbi tartalommal:
import java.io.IOException;
import java.util.*;
import java.nio.ByteBuffer;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.hadoop.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.utils.ByteBufferUtil;

public class KeyCollector {

    public static void main(String[] args) throws IOException {
        if (args.length != 1) {
            System.err.println("Usage: KeyCollector <output path>");
            System.exit(-1);
        }

        JobConf conf = new JobConf(KeyCollector.class);
        conf.setJobName("KeyCollector");

        ConfigHelper.setInputInitialAddress(conf, "cassandra1.lo");
        ConfigHelper.setInputColumnFamily(conf, "HadoopTest", "content");
        ConfigHelper.setInputPartitioner(conf, "org.apache.cassandra.dht.Murmur3Partitioner");
        SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes("text")));
        ConfigHelper.setInputSlicePredicate(conf, predicate);

        conf.setInputFormat(ColumnFamilyInputFormat.class);

        conf.setMapperClass(KeyCollectorMapper.class);

        FileOutputFormat.setOutputPath(conf, new Path(args[0]));

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setReducerClass(KeyCollectorReducer.class);

        JobClient.runJob(conf);
    }

    public static class KeyCollectorMapper extends MapReduceBase implements Mapper<ByteBuffer, Map<ByteBuffer, BufferCell>, Text, IntWritable> {
        public void map(ByteBuffer key, Map<ByteBuffer, BufferCell> columns, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            String textKey = ByteBufferUtil.string(key);
            output.collect(new Text(textKey), new IntWritable(1));
        }
    }

    public static class KeyCollectorReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            output.collect(key, new IntWritable(sum));
        }
    }
}
Valószínűleg ez a világ legértelmetlenebb map/reduce jobja, összegyűjti a column familybe lévő kulcsokat, de ez tűnt a legegyszerűbb implementációnak. Természetesen lett volna lehetőség a job eredményét a Cassandrába tárolni, de a letisztultság jegyében, én a fájl rendszert preferáltam. Fordítsuk le az osztályt és csomagoljuk be egy jar-ba a mester Hadoop konténerben.
yum install -y java-1.8.0-openjdk-devel
cd /vagrant
mkdir build
classpath=.
for jar in /usr/share/cassandra/*.jar; do classpath=$classpath:$jar; done
for jar in /usr/share/cassandra/lib/*.jar; do classpath=$classpath:$jar; done
for jar in `find /usr/local/hadoop/share/hadoop/ *.jar`; do classpath=$classpath:$jar; done
javac -classpath $classpath -d build KeyCollector.java
jar -cvf KeyCollector.jar -C build/ .
Következő lépés, hogy ellenőrizzük a cluster működését, és teszt adattal töltjük fel az adatbázist, szintén a mester konténerből.
nodetool -h cassandra1.lo status
cassandra-cli -h casandra1.lo
create keyspace HadoopTest with strategy_options = {replication_factor:2} and placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy';
use HadoopTest;
create column family content with comparator = UTF8Type and key_validation_class = UTF8Type and default_validation_class = UTF8Type and column_metadata = [ {column_name: text, validation_class:UTF8Type} ];
set content['apple']['text'] = 'apple apple red apple bumm';
set content['pear']['text'] = 'pear pear yellow pear bumm';
Elérkezett a várv várt pillanat, futtathatjuk a jobot.
$HADOOP_PREFIX/bin/hadoop jar KeyCollector.jar KeyCollector output
$HADOOP_PREFIX/bin/hdfs dfs -cat output/*
A kimenetben láthatjuk, hogy egy darab apple és egy darab pear kulcs van az adatbázisban.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/cassandra/lib/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
apple: 1
pear: 1

Teljesítmény


Befejezésül ejtsünk pár szót a rendszer teljesítmény optimalizálásáról.

A Cassandra elég korai verziójában bevezetésre kerültek az un. vNodeok, amik azt biztosítják, hogy egy Cassandra valós node a token tartomány több szeletét is birtokolhassa egyszerre. A Cassandra oldaláról számos előnye van ennek a megoldásnak, viszont a Hadoop felöl érkezve kifejezetten káros hatása van. A vNodeok száma kihatással van a szeletek (split) számára, ami annyit tesz, hogy annyi darab szelet egészen biztos lesz, ahány vNode van engedélyezve.

A következő paraméter, amire érdemes figyelni, az a szeletek mérete. Az alapértelmezett szelet mérete 64K, ha mondjuk van 5,000,000,000 sor az adatbázisban, akkor 64,000-el számolva 78,125 szeletben fogja elvégezni a műveletet a Hadoop, ami szeletenkénti 10 másodperccel szorozva száz óra körüli futás időt eredményez. Az alábbi sorral konfigurálhatjuk a szeletek méretét.
ConfigHelper.setInputSplitSize(job.getConfiguration(), 10000000);
Mivel a feldolgozó egységekben korlátozott mennyiségű memória áll rendelkezésre, javasolt finom-hangolni az egyszerre feldolgozott adatok számát, a példában 100-as kötegekben fogja a CQL driver a sorokat prezentálni, az alapértelmezett 1000 darab helyett.
CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), Integer.toString(100));
A negyedik megkerülhetetlen téma az un. data locality. A jó teljesítmény eléréséhez elengedhetetlen, hogy az adatokat a legkisebb mértékben kelljen mozgatni. A Cassandra fejlesztői az InputFormat implementálása során szerencsére gondoltak erre a probléma forrásra, annyi a dolgunk, hogy ugyanarra a hostra telepítjük a Cassandrát, és a Hadoop JobTrackert. Sajnos Docker konténereket használva ezt bukjuk, mert a két konténer mindig két különálló host lesz, gyakorlatilag mintha külön gépen futnának a szolgáltatások. Megtehetnénk, hogy összeházasítjuk a két konténert, de akkor a Docker szemlélettel mennénk szembe, miszerint minden szolgáltatás egy önálló konténer legyen. Ha ezt el szeretnénk kerülni, akkor bizony be kell koszolnunk a kezünket. Én első lépésben megnyitottam a legfrissebb dokumentációt. Láthatjuk, hogy semmi nem található benne a témával kapcsolatban, de ne keseredjünk el, ugyanis van egy őse ennek a dokumentumnak, nyissuk meg a AbstractColumnFamilyInputFormat fájlt is. A public List<InputSplit> getSplits(JobContext context) metódusban történik a csoda, itt állítja össze a Hadoop a szeleteket, minden egyes szeletet egy InputSplit objektum reprezentál, és tartalmazza azt/azokat a host neveket, ahol az adat megtalálható.

A jelen cluster egy szoftveres hálózaton kommunikál, így ebben az esetben nincs jelentősége az adatok tényleges helyének, nem is bonyolítanám tovább a rendszert saját InputFormat írásával és/vagy különböző DNS trükkök bevezetésével. A jó megoldást mindenféleképpen az adott hálózati architektúrának megfelelően kell kialakítani. Akit érdekelnek további a teljesítménnyel kapcsolatos kérdések itt talál pár választ.

Nincsenek megjegyzések:

Megjegyzés küldése