A következő címkéjű bejegyzések mutatása: Docker. Összes bejegyzés megjelenítése
A következő címkéjű bejegyzések mutatása: Docker. Összes bejegyzés megjelenítése

2015. június 5., péntek

Konténerezett Hadoop és Cassandra cluster konfigurálása - harmadik rész

A sorozat előző részeiben (1, 2) Vagrantos környezetben felépítettünk egy Hadoop clustert. Ebben a befejező cikkben egy Cassandra fürtöt fogunk telepíteni, majd egy map/reduce jobot futtatunk a teljes clusteren. Izgalmasan hangzik, vágjunk is bele.

Előkészítés

Első dolgunk, hogy meglévő projektünket frissítjük a megfelelő verzióra, és építsük újra a konténereket:
cd docker-cassandra && git checkout 2.6.0-cassandra && git pull origin 2.6.0-cassandra
cd ../hadoop-docker && git checkout 2.6.0-cassandra && git pull origin 2.6.0-cassandra
cd .. && git checkout 2.6.0-cassandra && git pull origin 2.6.0-cassandra
Élesszük fel a gépeket:
vagrant halt && vagrant up
Majd a futó Vagrantos környezetben építsük újra a konténereket:
vagrant provision master slave1 slave2 slave3
Lépjünk be a virtuális gépekre, és töröljünk minden futó konténert:
docker rm -f $(docker ps -qa)
Ha mindezzel végeztünk, hozzuk létre ismét a Weave hálózatot és a Swarm clustert, a Hadoop konténereket egyelőre hagyjuk parlagon.

Változások

Csökkentettem a master nevű gép memória igényét, mivel a továbbiakban csak mesterként fog szolgálni a benne futó Hadoop konténer, és ezzel egy időben növeltem a slaveX nevű gépek memóriáját, mert az eddigi beállítás ki volt hegyezve a Hadoopra, mostantól viszont a Cassandrának is kell helyet szorítani. A fejlesztés előrehaladtával a gépemben lévő 8 Gb RAM már sokszor kevésnek bizonyult, elkezdte a Vagrant aktívan használni a swapet, ami igen rossz hatással van a jobok futtatására, lépten nyomon elhasaltak. Én átmenetileg a 3-s számú slave gépet kikapcsoltam. Összességében 4 virtuális gépből és 15 konténerből áll a cluster, szóval személy szerint nem is csodálkozom, hogy ilyen mértékben megnövekedett a gép igény.

bootstrap.sh
if [ -n "$MASTER_IS_SLAVE_TOO" ]; then
    echo $HOST_NAME > $HADOOP_PREFIX/etc/hadoop/slaves
else
    echo "" > $HADOOP_PREFIX/etc/hadoop/slaves
fi
Bevezettem egy környezeti változót (MASTER_IS_SLAVE_TOO) melynek hatására a mester konténer szolga is lesz egyben, a változó nélkül csak mesteri teendőit látja el.

Dockerfile
RUN sed -i "s|^# Extra Java CLASSPATH.*|&\nexport HADOOP_CLASSPATH=/usr/share/cassandra/*:/usr/share/cassandra/lib/*:\$HADOOP_CLASSPATH|" $HADOOP_PREFIX/etc/hadoop/hadoop-env.sh
Javítottam a HADOOP_CLASSPATHon, hiányzott egy Cassandrás függőség.

cassandra-clusternode.sh
if [ -n "$PUBLIC_INTERFACE" ]; then
    IP=$(ifconfig $PUBLIC_INTERFACE | awk '/inet addr/{print substr($2,6)}')
    PUBLIC_IP=$IP
fi
if [ -n "$PUBLIC_IP" ]; then
    sed -i -e "s/^# broadcast_address: 1.2.3.4/broadcast_address: $PUBLIC_IP/" $CASSANDRA_CONFIG/cassandra.yaml
fi
A Cassandrás konténerben egy új környezeti változóval, név szerint PUBLIC_INTERFACE, megoldottam, hogy a Cassandra a megfelelő IP címet használja minden nemű kommunikációhoz.
if [ -n "$CASSANDRA_SEEDS" ]; then
    for a in $(echo $CASSANDRA_SEEDS | sed 's/,/ /g'); do CASSANDRA_SEEDS=$(echo $CASSANDRA_SEEDS | sed "s/$a/$(ping -c1 $a | grep PING | awk '{ print $3 }' | sed "s/(//;s/)//")/"); done
fi
Mivel a konténerek dinamikusan kapnak IP címet, a Cassandra viszont csak IP alapján tud kapcsolódni a seed szerverekhez, ezért meg kellett trükköznöm a CASSANDRA_SEEDS változót, domain neveket és IP címeket is elfogad egyaránt, majd a Cassandra indítása előtt feloldja a domain neveket IP címekre.

Futtatás


slave1
nohup docker -H tcp://192.168.50.15:1234 run --name cassandra-slave1 --dns 192.168.50.15 -h cassandra1.lo -e "PUBLIC_INTERFACE=eth0" -e "CASSANDRA_CLUSTERNAME=HadoopTest" -e "CASSANDRA_TOKEN=-9223372036854775808" -t mhmxs/cassandra-cluster > cassandra.log 2>&1 &
docker -H tcp://192.168.50.15:1234 run --name hadoop-slave1 --dns 192.168.50.15 -h slave1.lo -e "MASTER=master.lo" -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash
A Cassadnrás konténer logját a cassandra.log fájlban találjuk, érdemes a Hadoop cluster elindítása előtt összeállítani a Cassandra clustert (vagy külön terminálban nohup nélkül indítani), mert ha valami időzítési vagy hálózati probléma miatt nem találták meg egymást a nodeok, akkor elég kényelmetlen a Swarm clusterből törölgetni, majd újraindítgatni a megfelelő konténereket. Sokszor kellett a fejlesztés alatt hasonlót csinálnom, és a rendszer sem túl hiba tűrő, úgyhogy rászoktam, hogy minden lépés előtt ellenőrzöm, hogy az elemek a helyükre kerültek-e.

slave2
nohup docker -H tcp://192.168.50.15:1234 run --name cassandra-slave2 --dns 192.168.50.15 -h cassandra2.lo -e "PUBLIC_INTERFACE=eth0" -e "CASSANDRA_CLUSTERNAME=HadoopTest" -e "CASSANDRA_SEEDS=cassandra1.lo" -e "CASSANDRA_TOKEN=-3074457345618258603" -t mhmxs/cassandra-cluster > cassandra.log 2>&1 &
docker -H tcp://192.168.50.15:1234 run --name hadoop-slave2 --dns 192.168.50.15 -h slave2.lo -e "MASTER=master.lo" -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash
slave3
nohup docker -H tcp://192.168.50.15:1234 run --name cassandra-slave3 --dns 192.168.50.15 -h cassandra3.lo -e "PUBLIC_INTERFACE=eth0" -e "CASSANDRA_CLUSTERNAME=HadoopTest" -e "CASSANDRA_SEEDS=cassandra1.lo" -e "CASSANDRA_TOKEN=3074457345618258602" -t mhmxs/cassandra-cluster > cassandra.log 2>&1 &
docker -H tcp://192.168.50.15:1234 run --name hadoop-slave3 --dns 192.168.50.15 -h slave3.lo -e "MASTER=master.lo" -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash
master
docker -H tcp://192.168.50.15:1234 run --name hadoop-master --dns 192.168.50.15 -h master.lo -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -v /vagrant:/vagrant -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash

Job


A hozzuk létre a projekt könyvtárban a KeyCollector.java fájlt az alábbi tartalommal:
import java.io.IOException;
import java.util.*;
import java.nio.ByteBuffer;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.hadoop.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.utils.ByteBufferUtil;

public class KeyCollector {

    public static void main(String[] args) throws IOException {
        if (args.length != 1) {
            System.err.println("Usage: KeyCollector <output path>");
            System.exit(-1);
        }

        JobConf conf = new JobConf(KeyCollector.class);
        conf.setJobName("KeyCollector");

        ConfigHelper.setInputInitialAddress(conf, "cassandra1.lo");
        ConfigHelper.setInputColumnFamily(conf, "HadoopTest", "content");
        ConfigHelper.setInputPartitioner(conf, "org.apache.cassandra.dht.Murmur3Partitioner");
        SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes("text")));
        ConfigHelper.setInputSlicePredicate(conf, predicate);

        conf.setInputFormat(ColumnFamilyInputFormat.class);

        conf.setMapperClass(KeyCollectorMapper.class);

        FileOutputFormat.setOutputPath(conf, new Path(args[0]));

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setReducerClass(KeyCollectorReducer.class);

        JobClient.runJob(conf);
    }

    public static class KeyCollectorMapper extends MapReduceBase implements Mapper<ByteBuffer, Map<ByteBuffer, BufferCell>, Text, IntWritable> {
        public void map(ByteBuffer key, Map<ByteBuffer, BufferCell> columns, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            String textKey = ByteBufferUtil.string(key);
            output.collect(new Text(textKey), new IntWritable(1));
        }
    }

    public static class KeyCollectorReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            output.collect(key, new IntWritable(sum));
        }
    }
}
Valószínűleg ez a világ legértelmetlenebb map/reduce jobja, összegyűjti a column familybe lévő kulcsokat, de ez tűnt a legegyszerűbb implementációnak. Természetesen lett volna lehetőség a job eredményét a Cassandrába tárolni, de a letisztultság jegyében, én a fájl rendszert preferáltam. Fordítsuk le az osztályt és csomagoljuk be egy jar-ba a mester Hadoop konténerben.
yum install -y java-1.8.0-openjdk-devel
cd /vagrant
mkdir build
classpath=.
for jar in /usr/share/cassandra/*.jar; do classpath=$classpath:$jar; done
for jar in /usr/share/cassandra/lib/*.jar; do classpath=$classpath:$jar; done
for jar in `find /usr/local/hadoop/share/hadoop/ *.jar`; do classpath=$classpath:$jar; done
javac -classpath $classpath -d build KeyCollector.java
jar -cvf KeyCollector.jar -C build/ .
Következő lépés, hogy ellenőrizzük a cluster működését, és teszt adattal töltjük fel az adatbázist, szintén a mester konténerből.
nodetool -h cassandra1.lo status
cassandra-cli -h casandra1.lo
create keyspace HadoopTest with strategy_options = {replication_factor:2} and placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy';
use HadoopTest;
create column family content with comparator = UTF8Type and key_validation_class = UTF8Type and default_validation_class = UTF8Type and column_metadata = [ {column_name: text, validation_class:UTF8Type} ];
set content['apple']['text'] = 'apple apple red apple bumm';
set content['pear']['text'] = 'pear pear yellow pear bumm';
Elérkezett a várv várt pillanat, futtathatjuk a jobot.
$HADOOP_PREFIX/bin/hadoop jar KeyCollector.jar KeyCollector output
$HADOOP_PREFIX/bin/hdfs dfs -cat output/*
A kimenetben láthatjuk, hogy egy darab apple és egy darab pear kulcs van az adatbázisban.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/cassandra/lib/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
apple: 1
pear: 1

Teljesítmény


Befejezésül ejtsünk pár szót a rendszer teljesítmény optimalizálásáról.

A Cassandra elég korai verziójában bevezetésre kerültek az un. vNodeok, amik azt biztosítják, hogy egy Cassandra valós node a token tartomány több szeletét is birtokolhassa egyszerre. A Cassandra oldaláról számos előnye van ennek a megoldásnak, viszont a Hadoop felöl érkezve kifejezetten káros hatása van. A vNodeok száma kihatással van a szeletek (split) számára, ami annyit tesz, hogy annyi darab szelet egészen biztos lesz, ahány vNode van engedélyezve.

A következő paraméter, amire érdemes figyelni, az a szeletek mérete. Az alapértelmezett szelet mérete 64K, ha mondjuk van 5,000,000,000 sor az adatbázisban, akkor 64,000-el számolva 78,125 szeletben fogja elvégezni a műveletet a Hadoop, ami szeletenkénti 10 másodperccel szorozva száz óra körüli futás időt eredményez. Az alábbi sorral konfigurálhatjuk a szeletek méretét.
ConfigHelper.setInputSplitSize(job.getConfiguration(), 10000000);
Mivel a feldolgozó egységekben korlátozott mennyiségű memória áll rendelkezésre, javasolt finom-hangolni az egyszerre feldolgozott adatok számát, a példában 100-as kötegekben fogja a CQL driver a sorokat prezentálni, az alapértelmezett 1000 darab helyett.
CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), Integer.toString(100));
A negyedik megkerülhetetlen téma az un. data locality. A jó teljesítmény eléréséhez elengedhetetlen, hogy az adatokat a legkisebb mértékben kelljen mozgatni. A Cassandra fejlesztői az InputFormat implementálása során szerencsére gondoltak erre a probléma forrásra, annyi a dolgunk, hogy ugyanarra a hostra telepítjük a Cassandrát, és a Hadoop JobTrackert. Sajnos Docker konténereket használva ezt bukjuk, mert a két konténer mindig két különálló host lesz, gyakorlatilag mintha külön gépen futnának a szolgáltatások. Megtehetnénk, hogy összeházasítjuk a két konténert, de akkor a Docker szemlélettel mennénk szembe, miszerint minden szolgáltatás egy önálló konténer legyen. Ha ezt el szeretnénk kerülni, akkor bizony be kell koszolnunk a kezünket. Én első lépésben megnyitottam a legfrissebb dokumentációt. Láthatjuk, hogy semmi nem található benne a témával kapcsolatban, de ne keseredjünk el, ugyanis van egy őse ennek a dokumentumnak, nyissuk meg a AbstractColumnFamilyInputFormat fájlt is. A public List<InputSplit> getSplits(JobContext context) metódusban történik a csoda, itt állítja össze a Hadoop a szeleteket, minden egyes szeletet egy InputSplit objektum reprezentál, és tartalmazza azt/azokat a host neveket, ahol az adat megtalálható.

A jelen cluster egy szoftveres hálózaton kommunikál, így ebben az esetben nincs jelentősége az adatok tényleges helyének, nem is bonyolítanám tovább a rendszert saját InputFormat írásával és/vagy különböző DNS trükkök bevezetésével. A jó megoldást mindenféleképpen az adott hálózati architektúrának megfelelően kell kialakítani. Akit érdekelnek további a teljesítménnyel kapcsolatos kérdések itt talál pár választ.

2015. május 25., hétfő

Konténerezett Hadoop és Cassandra cluster konfigurálása - második rész

Az előző részben sikeresen elindítottuk a Hadoop clustert, annyi szépség hibával, hogy a Docker konténerek a Vagrantos gépek IP címeire telepedtek --net=host kapcsolóval, ami bizonyos körülmények között elfogadható, de mindenképpen szeretném elkerülni a host IP-k használatát. A most következő részben három dologgal fogom felturbózni az infrastruktúrát. Szeretném a gépeket Swarm clusterben összefogni, hogy DNS segítségével találják meg egymást, egy privát hálózaton. A Swarm a Docker Engine képességeit terjeszti ki egy egész gép csoportra, lényegében a megszokott szolgáltatásokat kapjuk, csak nem egy gépre levetítve, hanem egy egész fürtre. A Weave gondoskodik a konténer közi hálózatról, és bár rendelkezik saját DNS megoldással, számomra a Docker-spy szimpatikusabb választásnak tűnt, mert egyszerűbb telepíteni és üzemeltetni.

Előkészítés

Első dolgunk, hogy meglévő projektünket frissítjük a megfelelő verzióra:
cd hadoop-docker && git checkout 2.6.0-dns && git pull origin 2.6.0-dns
cd .. && git checkout 2.6.0-dns && git pull origin 2.6.0-dns
A Docker Swarmot és egyéb szükséges szolgáltatásokat egy újabb virtuális gépre telepítettem, kénytelen voltam a hálózati címeket megváltoztatni, mert a Swarm menedzser nem volt hajlandó az 1-es végű gépre csatlakozni, kitöröltem a hostnév konfigurációt, hiszen többé nem lesz rá szükség, illetve a Weave hálózathoz szükséges újabb IP-ket konfiguráltam:
config.vm.define "swarm" do |sw|
 sw.vm.network "private_network", ip: "192.168.50.15"

 sw.vm.provision :shell, inline: "sh /vagrant/swarm.sh 10.2.0.15/16 10.2.15.0/24"

 config.vm.provider :virtualbox do |vb|
  vb.customize ["modifyvm", :id, "--memory", "512"]
 end
end
Élesszük fel a gépeket:
vagrant halt && vagrant up
Majd a futó Vagrantos környezetben építsük újra a konténereket illetve eszközöljük az egyéb változtatásokat:
vagrant provision master slave1 slave2 slave3
Lépjünk be a virtuális gépekre, és töröljünk minden futó konténert:
docker rm -f $(docker ps -qa)

Változások

Mielőtt nekilátnánk a szolgáltatások elindításához vegyük sorra pontosan miket kellett átalakítani a kódban.

common.sh
sed -i "s/exit 0//" /etc/rc.local
if [ -z "$(cat /etc/rc.local | grep 'docker restart')" ]; then
        echo "service docker restart" >> /etc/rc.local
fi
Biztos ami biztos a Dockert kényszerítem, hogy elinduljon.
cat > /etc/network/interfaces.d/weave.cfg << EOF
auto weave
iface weave inet static
        address $(echo $1 | sed "s|/.*||")
        network 10.2.0.0
        netmask 255.255.0.0
        broadcast 0.0.0.0
        bridge_ports none
        bridge_maxwaitout 0
EOF
Létrehoztam egy hálózati hidat a Weave számára az első paraméterben (pl. 10.2.0.15/16) megadott IP címmel.
apt-get update && apt-get -y install bridge-utils curl docker.io
echo "" > /etc/default/docker
service docker restart
Telepítem a bridge-utils csomagot ami elengedhetetlen a perzisztens hálózati hídhoz, majd kényszerítem a Dockert, hogy alap beállításokkal induljon el.
if [ ! -f /usr/local/bin/weave ]; then
    wget -O /usr/local/bin/weave https://github.com/weaveworks/weave/releases/download/latest_release/weave
    chmod a+x /usr/local/bin/weave
fi
if [ -z "$(ifconfig | grep weave)" ]; then
    weave create-bridge
    ip addr add dev weave $1
fi
Telepítem a Weavet, és mivel az előzőekben definiált hálózati híd a következő újraindítás során válik csak perzisztenssé, ezért most az egyszer kézzel létrehozom azt a Weave beépített eszközével.
echo "DOCKER_OPTS='-H tcp://$(ifconfig eth1 | awk '/inet addr/{print substr($2,6)}'):2375 -H unix:///var/run/docker.sock --bridge=weave --fixed-cidr=$2'" > /etc/default/docker
service docker restart
Az eth1 nevű interfésszel összekapcsolom a Dockert, így az kívülről is elérhetővé válik, továbbra is használatban marad a Unix socket, definiálom a hidat mint hálózati csatolót, és garantálom, hogy a Docker minden konténernek teljesen egyedi IP-t adjon cluster szerte a második paraméterben (pl. 10.2.15.0/24) megadott IP tartománnyal.

A konténer konfigurációjában is kellett kicsit piszkálni, a bootstrap.sh-ban lecseréltem a statikus IP konfigurációt host nevekre:
if [ -z "$MASTER" ]; then
   HOST_NAME=$(hostname)
   echo $HOST_NAME > $HADOOP_PREFIX/etc/hadoop/masters
   echo $HOST_NAME > $HADOOP_PREFIX/etc/hadoop/slaves
   if [ -n "$SLAVES" ]; then
      echo "$SLAVES" | sed -e "s/,/\n/g" >> $HADOOP_PREFIX/etc/hadoop/slaves
   fi
 
   for a in `ls $HADOOP_PREFIX/etc/hadoop/*site.xml`; do sed -i "s/master/$HOST_NAME/g" $a; done
else
   if [ -n "$MASTER" ]; then
      for a in `ls $HADOOP_PREFIX/etc/hadoop/*site.xml`; do sed -i "s/master/$MASTER/g" $a; done
   fi
fi
Többé nem a mesterrel kell környezeti változóval tudatni, hogy mester módban induljon el, hanem a szolgáknak kell megadni a mester host nevét. A mester kicseréli a *.site.xml konfigurációs állományokban a master nevet a saját host nevére, a szolgák pedig a megadott névre.

Futtatás

A rendszer automatizálása eddig a pontig tartott, az egyes komponenseket már kézzel kell elindítani. A swarm nevű gépen indítsuk el a Weave hálózatot, hozzuk létre a Swarm cluster és indítsuk el a menedzsert:
weave launch
docker run --rm swarm create > /vagrant/currenttoken
docker run -d -p 1234:2375 swarm manage token://$(cat /vagrant/currenttoken)
Majd a többi gépen csatlakozzunk a hálózathoz és a clusterhez:
weave launch 192.168.50.15
docker run -d swarm join --addr=$(ifconfig eth1 | awk '/inet addr/{print substr($2,6)}'):2375 token://$(cat /vagrant/currenttoken)
Ellenőrizzük, hogy a Swarm cluster megfelelően jött-e létre:
docker -H tcp://192.168.50.15:1234 info
Látnunk kell, hogy 3 gépből áll a fürt (ne essünk kétségbe, ha rögtön nem jelennek meg a gépek, van egy kis késleltetése a rendszernek). A feladat első részével végeztünk is, ha megfelelően paraméterezve elindítjuk a konténereket, akkor a hálózaton látni fogják egymást. Nagyobb cluster méret esetén érdemes még un. Service discovery réteggel kibővíteni  a rendszert. Támogatott implementáció akad bőven Etcd, Consul, Zookeeper, nem szeretnék a részletekbe belemenni, mindenki használja a saját preferáltját. A SD-nek két fontos szerepe van: az egyik, hogy a Swarm menedzser azon keresztül találja meg a gépeket a clusterben, a másik, hogy a gépek egymást is azon keresztül találják meg a hálózaton. A jelenlegi gép méret az első opciót nem igazán indokolja, DNS szolgáltatásnak pedig mondhatni ágyúval verébre esete áll fenn, így valami egyszerűbb megoldást választottam. Számtalan DNS projekt érhető el Docker ökoszisztémában, a legtöbb arra alapozza a működését, hogy listenereket aggat a Docker registryre és elcsípik amikor egy konténer létrejön vagy megszűnik. Ahogy említettem és a Docker-spyt választottam, és ennek megfelelően indítsuk is el azt a swarm nevű gépen.
docker run --name docker-spy -e "DOCKER_HOST=tcp://192.168.50.15:1234" -e "DNS_DOMAIN=lo" -p 53:53/udp -p 53:53 -v /var/run/docker.sock:/var/run/docker.sock iverberk/docker-spy
Ez a konténer nem csatlakozik a Swarm clusterhez, viszont a cluster eseményeire reagál, és az lo végződésű host neveket regisztrálja a memóriában, a többi kérést pedig a Google népszerű név feloldó szervere felé továbbítja, így nem kell aggódnunk, hogy csak a saját neveinket oldja fel.

Nincs más dolgunk, mint elindítani a Hadoop konténereket.

slave1
docker -H tcp://192.168.50.15:1234 run --name hadoop-slave1 --dns 192.168.50.15 -h slave1.lo -e "MASTER=master.lo" -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash
slave2
docker -H tcp://192.168.50.15:1234 run --name hadoop-slave2 --dns 192.168.50.15 -h slave2.lo -e "MASTER=master.lo" -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash
slave3
docker -H tcp://192.168.50.15:1234 run --name hadoop-slave3 --dns 192.168.50.15 -h slave3.lo -e "MASTER=master.lo" -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash
master
docker -H tcp://192.168.50.15:1234 run --name hadoop-master --dns 192.168.50.15 -h master.lo -e "SLAVES=slave1.lo,slave2.lo,slave3.lo" -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash
Ha mindent jól csináltunk, akkor a docker-spy konténer logjában látnunk kell, ahogyan regisztrálja a host neveket, kedvünre futtathatjuk a nekünk tetsző map/reduce jobot.

A következő részben egy Cassandra cluster beüzemelésének lépéseit tervezem bemutatni, némi teljesítmény hangolással, és egy map/reduce job futtatásával.

2015. május 19., kedd

Konténerezett Hadoop és Cassandra cluster konfigurálása - első rész

A következő cikk-sorozatban egy Hadoop cluster telepítését mutatom be Cassandra támogatással, a méltán népszerű Docker konténerekbe zárva. Ebben a cikkben a Hadoop telepítése és konfigurálása kerül terítékre. Bár mindkét rendszer elindítható egyke módban, azért az elsődleges cél területük a nagy mennyiségű adat kezelés elosztott rendszereken, ha pedig a skálázhatóság is fontos szerepet játszik a kiszolgálásban, akkor a Docker kézenfekvő és divatos megoldás. A konténereket ésszerűen nem 0-ról építettem fel, hanem újrahasznosítottam a SequenceIQ Hadoop, és a Spotify Cassandra konténerét. Mindkét csapatról elmondható, hogy van sejtésük :) a témáról, így biztos alapnak éreztem az ő megoldásaikból kiindulni. A cikk készítése során fontos szempontnak tartottam, hogy bárki, aki megfelelő mennyiségű memóriával rendelkezik, a saját gépén próbálhassa ki a Hadoop és Cassandra házasságát, és ne kelljen valamilyen olcsó vagy drága felhő szolgáltatást előfizetnie. Választásom a Vagrantra esett, és segítségével virtualizálom a gépeket, a hálózatot, és egyéb erő forrásokat, és készítek privát hálózatot, amin keresztül a gépek elérik egymást. További előnye a döntésnek, hogy így a Hadoop memória kezelésébe is el kell mélyednünk, hiszen mindent minimalizálni kényszerültem, mivel a Hadoop alap konfiguráció óriási méretekre van optimalizálva, a konténerenknéti maximális 8192 Mb memória és 8 vcore legalábbis erről árulkodik.

A teljes projekt forrását elérhetővé tettem, így az első lépés a forrás megszerzése, és a Vagrant gépek elindítása. Az egyes gépek között 4.5 Gb memóriát osztottam szét (+oprendszer, +vagrant,+a böngésző amiben olvasod a cikket),  amennyiben nincs elegendő memória, kézzel kell a főbb gépeket elindítani. Türelmetlenek a cassandra.sh fájlban letilthatják a Cassandra konténerek építését, egy darabig úgysem lesz rájuk szükség.
git clone https://github.com/mhmxs/vagrant-host-hadoop-cassadra-cluster.git
cd vagrant-host-hadoop-cassadra-cluster
git submodule update --init
cd hadoop-docker && git checkout 2.6.0-static-ip
cd .. && git checkout 2.6.0-static-ip
vagrant up
Amíg a letöltés, telepítés, és egyéb gyártási folyamatok zajlanak kukkantsunk kicsit a motor háztető alá, lesz rá időnk bőven, ugyanis ezen a ponton nem sokat optimalizáltam, így sok tartalom többször is letöltésre kerül az egyes virtuális gépeken (javaslatokat várom a hozzászólásban). Íme a vagrant konfiguráció:
 config.vm.box = "ubuntu/vivid64"
 config.vm.provider "virtualbox" do |v|
  v.memory = 1024
 end
 config.vm.box_check_update = false
 config.vm.define "slave1" do |slave|
  slave.vm.network "private_network", ip: "192.168.50.1"
  slave.vm.provision :shell, inline: "hostname slave1 && sh /vagrant/cassandra.sh"
 end
 config.vm.define "slave2" do |slave|
  slave.vm.network "private_network", ip: "192.168.50.2"
  slave.vm.provision :shell, inline: "hostname slave2 && sh /vagrant/cassandra.sh"
 end
 config.vm.define "slave3" do |slave|
  slave.vm.network "private_network", ip: "192.168.50.3"
  slave.vm.provision :shell, inline: "hostname slave3 && sh /vagrant/cassandra.sh"
 end
 config.vm.define "master" do |master|
  master.vm.network "private_network", ip: "192.168.50.4"
  config.vm.provision :shell, inline: "hostname master && sh /vagrant/hadoop.sh"
  config.vm.provider :virtualbox do |vb|
   vb.customize ["modifyvm", :id, "--memory", "1536"]
  end
 end
Tehát van 4 gépünk egy hálózaton, egy mester és 3 szolga. A host név beállítás sajnos nem perzisztens, de a hibát már csak a következő verzióban javítottam, elnézést. A gépek száma tovább növelhető, az ököl szabály, hogy ami 3 gépen működik, az jó eséllyel működik többön is, ezért én a minimális darabszámot preferáltam. Elég sok leírást olvastam végig a Hadoop fürtök konfigurálásáról, és azonnal meg is jegyezném az egyik legnagyobb problémát az ökoszisztémával kapcsolatban. Olyan ütemben fejlődik a platform, hogy amit ma megírtam, az lehet, hogy holnap már nem is érvényes, vonatkozik ez az architektúrára (természetesen nem messziről szemlélve, az ördög a részletekben lakik), a beállításokra, és a fellépő hibákra is. Számtalan esetben az általam használt 2.6.0-ás verziójú Hadoop komponensben már nem is létezett az a kapcsoló, amivel javasolták az elcsípett kivétel kezelését, és kivétel akadt bőven.

Vegyük sorra milyen változtatásokat eszközöltem a beállításokban, de előtte tisztázzunk pár paramétert:
  • konténerek minimum memória foglalása: 4 Gb memória alatt 256 Mb az ajánlott
  • konténerek száma: min (2 * processzor magok, 1.8 * merevlemezek száma, Összes memória / konténer minimum memória foglalásával) - min(2, 1.8, 1024 / 256) = 2 egy kis csalással
  • rendszernek fenntartott memória: 4 Gb esetén 1 Gb, ennél kisebb értékre nincs ajánlás, így én az 512 Mb-ot választottam
  • összes felhasználható memória: rendelkezésre álló - fenntartott, 1024 - 512 = 512 Mb (gépenként 2 konténer)
További részletek és az ajánlott matek itt található.

Dockerfile
  • Telepítettem a Cassandrát
  • Mivel a Cassandra függősége az Open JDK 8, ezért eredeti Java konfigurációt eltávolítottam
  • Közös RSA kulcsot generáltam, hogy a konténerek között megoldható legyen a bejelentkezés jelszó nélkül
  • A Hadoop classpathjára betettem a Cassandrát
  • Pár további portot kitettem, exposoltam magyarul
  • Kicsit átrendeztem a parancsok sorrendjét, hogy minél gyorsabban lehessen konfigurációt változtatni, hála a rétegelt fájl-rendszernek a Docker csak a különbségig görgeti vissza a konténert, és onnan folytatja az építési műveletet tovább
core-site.xml
  • fs.defaultFS: hdfs://master:9000 - a master gépen fog futni a NameNode, és ezt jó, ha mindenkinek tudja
hdfs-site.xml
  • dfs.replication: 2 - a fájlrendszer replikációs faktorát emeltem meg 2-re
  • dfs.client.use.datanode.hostname: true - a dfs kliensek alap értelmezetten IP alapján kommunikálnak, ha több interfészünk, vagy több hálózati rétegünk van, akkor ez a működés okozhat fejfájást
  • dfs.datanode.use.datanode.hostname: true - szintén zenész
  • dfs.namenode.secondary.http-address: master:50090 - a másodlagos NameNode elérhetősége
  • dfs.namenode.http-address: master:50070 - az elsődleges NameNode elérhetősége
mapred-site.xml
  • mapreduce.jobtracker.address: master:8021 - szükségünk van egy JobTrackerre
  • mapreduce.map.memory.mb: 256 - map műveletet végző konténer memória foglalása
  • mapreduce.reduce.memory.mb: 512 - és a reducet végzőké
  • yarn.app.mapreduce.am.resource.mb: 512 - az AppMaster által felhasználható memória
  • yarn.app.mapreduce.am.job.client.port-range: 50200-50201 - alapértelmezetten un. ephemeral portot használ az AppMaster, az egyszerűség (tűzfal, port publikálás, stb.) kedvéért rögzítettem a tartományt
  • yarn.app.mapreduce.am.command-opts: -Xmx409m - AppMaster processz konfigurációja, ez okozott egy kis fejfájást, végül úgy találtam meg a problémát, hogy az eredeti forrásra egyesével rápakoltam a változtatásaimat, jó móka volt
  • mapreduce.jobhistory.address: master:10020 - elosztott környezetben szükség van JobHistory szerverre
  • mapreduce.jobhistory.webapp.address: master:19888 - igény esetén webes felületre is
  • mapreduce.application.classpath: ..., /usr/share/cassandra/*, /usr/share/cassandra/lib/* - az alapértelmezett útvonalakhoz hozzáadtam a Cassandrát
yarn-site.xml
  • yarn.resourcemanager.hostname: master
  • yarn.nodemanager.vmem-check-enabled: false - Centos 6 specifikus hiba jött elő, amit a virtuális memória ellenőrzésének kikapcsolásával tudtam megoldani. Bővebben a hibáról (6. Killing of Tasks Due to Virtual Memory Usage).
  • yarn.nodemanager.resource.cpu-vcores: 2 - a virtuális processzorok számát csökkentettem, így gépenként csak 2 osztható szét a konténerek között
  • yarn.nodemanager.resource.memory-mb: 512 - az a memória, amivel a helyi NodeManager rendelkezik
  • yarn.scheduler.minimum-allocation-mb: 256 - minimális memória foglalás konténerenként
  • yarn.scheduler.maximum-allocation-mb: 512 - és a maximális párja
  • yarn.nodemanager.address: ${yarn.nodemanager.hostname}:36123 - a konténer menedzser címe, alapértelmezetten a NodeManager választ portot magának, szintén specifikáltam
  • yarn.application.classpath: ..., /usr/share/cassandra/*, /usr/share/cassandra/lib/* - a Cassandrát elérhetővé tettem a Yarn számára
bootstrap.sh
echo $SLAVES | sed "s/,/\n/g" > /tmp/slaves
while read line; do printf "\n$line" >> /etc/hosts; done < /tmp/slaves
if [ -z "$(cat /etc/hosts | grep master)" ]; then
 printf "\n$MASTER_IP master" >> /etc/hosts
fi
if [ -n "$MASTER" ]; then
 echo "Starting master"
 
 rm /tmp/*.pid
 
 echo "master" > $HADOOP_PREFIX/etc/hadoop/masters
 echo "master" > $HADOOP_PREFIX/etc/hadoop/slaves
 while read line; do echo $line | sed -e "s/.*\s//" >> $HADOOP_PREFIX/etc/hadoop/slaves; done < /tmp/slaves
 
 $HADOOP_PREFIX/sbin/start-dfs.sh
 $HADOOP_PREFIX/bin/hdfs dfs -mkdir -p /user/root
 $HADOOP_PREFIX/sbin/start-yarn.sh
 $HADOOP_PREFIX/sbin/mr-jobhistory-daemon.sh  --config $HADOOP_PREFIX/etc/hadoop start historyserver
else
 echo "Starting slave"

 if [[ $1 == "-bash" ]]; then
  echo "To read logs type: tail -f $HADOOP_PREFIX/logs/*.log"
 fi
fi
Gondoskodtam a megfelelő DNS beállításokról, szétválasztottam a futás jellegét a MASTER környezeti változó alapján, amit a konténer futtatásakor tudunk megadni, valamint indítottam egy JobHistory szervert a masteren. Az echo "master" > $HADOOP_PREFIX/etc/hadoop/slaves sor gondoskodik arról, hogy a mester is végezzen szolgai munkát, igény szerint ez eltávolítható, és egyúttal némi memória is felszabadítható a mester gépen.

Miután befejeződött a Vagrant gépek előkészítése, és a konténerek is elkészültek, nincs más hátra, mint elindítani a konténereket, vagy mégsem? Sajnos nem, ugyanis egy kivételbe futunk a job futtatása során, de menjünk egy kicsit bele a részletekbe.
vagrant ssh slave[1-3]
docker run --name hadoop -h slave[1-3] -e "MASTER_IP=192.168.50.4" -e "SLAVES=192.168.50.1 slave1,192.168.50.2 slave2,192.168.50.3 slave3" -p 50020:50020 -p 50010:50010 -p 50075:50075 -p 8040:8040 -p 8042:8042 -p 49707:49707 -p 2122:2122 -p 36123:36123 -p 50200-50210:50200-50210 -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash
vagrant ssh master
docker run --name hadoop -h master -e "MASTER=true" -e "MASTER_IP=192.168.50.4" -e "SLAVES=192.168.50.1 slave1,192.168.50.2 slave2,192.168.50.3 slave3" -p 50020:50020 -p 50090:50090 -p 50070:50070 -p 50010:50010 -p 50075:50075 -p 9000:9000 -p 8021:8021 -p 8030:8030 -p 8031:8031 -p 8032:8032 -p 8033:8033 -p 8040:8040 -p 8042:8042 -p 49707:49707 -p 2122:2122 -p 8088:8088 -p 10020:10020 -p 19888:19888 -p 36123:36123 -p 50200-50210:50200-50210 -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash
Bizonyosodjunk meg róla, hogy mind a 4 konténer rendben elindult-e és, hogy a cluster összeállt-e megfelelően:
bin/hdfs dfsadmin -report
bin/yarn node -list
Tegyünk egy próbát:
bin/hdfs dfs -mkdir -p input && bin/hdfs dfs -put $HADOOP_PREFIX/etc/hadoop/* input
bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount input output
WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.net.NoRouteToHostException: No Route to Host from master/172.17.0.155 to 172.17.0.159:40896 failed on
socket timeout exception: java.net.NoRouteToHostException: No route to host; For more details see: http://wiki.apache.org/hadoop/NoRouteToHost
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:791)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:757)
at org.apache.hadoop.ipc.Client.call(Client.java:1472)
at org.apache.hadoop.ipc.Client.call(Client.java:1399)
at org.apache.hadoop.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:244)
at com.sun.proxy.$Proxy8.getTask(Unknown Source)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:132)
Caused by: java.net.NoRouteToHostException: No route to host
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:494)
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:607)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:705)
at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:368)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1521)
at org.apache.hadoop.ipc.Client.call(Client.java:1438)
Miközben újra futtattam a jobot megpróbáltam elcsípni a folyamatot, ami az imént a 40896 porton futott, most pedig a 39552-őt használta volna. Elég rövid volt az idő ablak, de én is elég fürge voltam.
# lsof -i tcp:39552
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 2717 root 235u IPv4 147112 0t0 TCP namenode:60573->172.17.0.15:39552 (SYN_SENT)
java 2733 root 235u IPv4 147111 0t0 TCP namenode:60572->172.17.0.15:39552 (SYN_SENT)
-bash-4.1# ps x | grep 2717
2717 ? Sl 0:01 /usr/lib/jvm/jre-1.8.0-openjdk/bin/java -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN -Xmx200m -Djava.io.tmpdir=/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1431716284376_0003/container_1431716284376_0003_01_000012/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=/usr/local/hadoop/logs/userlogs/application_1431716284376_0003/container_1431716284376_0003_01_000012 -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=INFO,CLA org.apache.hadoop.mapred.YarnChild 172.17.0.15 39552 attempt_1431716284376_0003_m_000008_1 12
Elindul a szolgán egy YarnChild processz, ami az első paraméterben megadott IP-n szeretne kommunikálni a második paraméterben megadott ephemeral porton (a forráskódból derült ki), és a mester nem tud hozzá csatlakozni, hiszen nem a virtuális gép IP címe van megadva, amin a kommunikáció valójában zajlik, és nem is a szolga host neve, hanem a Docker konténer IP-je, amit a mester nem ér el. Kutakodtam a konfigurációk között, próbáltam a forrásból kideríteni, hogy mely beállítással lehetnék hatással erre a működésre, és szomorúan kellett tudomásul vennem, hogy erre nincs kapcsoló vagy nagyon elrejtették.

Miután a port publikálási megoldással zsákutcába jutottam az alábbi megoldások jöhettek számításba:
  • Használom a --net=host Docker kapcsolót, és akkor a konténerek közvetlenül a virtuális gép hálózati csatolójára ülnek - tiszta és száraz érzés
  • Privát hálózatot hozok létre pl. VPN - sajtreszelővel ...
  • Konfigurálok egy Weave hálózatot - na ez már izgalmasan hangzik
Az egyszerűség kedvéért elsőként kipróbáltam a --net=host kapcsolót, hogy lássam egyáltalán működik-e az összerakott rendszer.
vagrant ssh slave[1-3]
docker run --name hadoop --net=host -e "MASTER_IP=192.168.50.4" -e "SLAVES=192.168.50.1 slave1,192.168.50.2 slave2,192.168.50.3 slave3" -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash
vagrant ssh master
docker run --name hadoop --net=host -e "MASTER=true" -e "MASTER_IP=192.168.50.4" -e "SLAVES=192.168.50.1 slave1,192.168.50.2 slave2,192.168.50.3 slave3" -it mhmxs/hadoop-docker:2.6.0 /etc/bootstrap.sh -bash
Ezúttal tökéletesen lefutott a job, örülünk, de elégedettek még nem vagyunk. A jelenlegi állapot forrása itt érhető el, a Vagrantos környezeté pedig itt.

Előzetes a következő rész tartalmából. Hűsünk beállítja a Swarm clustert, majd némi küzdelem árán lecseréli a kézi DNS konfigurációt automatizált megoldásra. Vajon melyik implementációt választja? Kiderül a folytatásban...