A következő címkéjű bejegyzések mutatása: Golang. Összes bejegyzés megjelenítése
A következő címkéjű bejegyzések mutatása: Golang. Összes bejegyzés megjelenítése

2017. szeptember 26., kedd

Kiterjesztések írása Go nyelvi környezetben

Munkám során elkerülhetetlenné vált, hogy kicsit közelebbről is megismerjem a Go nyelv kiterjesztési képességeit. Konkrétan azt az aspektusát, amikor egy adott problémára több megoldás/kiterjesztés is létezik, és a programunknak futás időben kell a megfelelő implementációt felhasználnia. Tipikus példája ennek, amikor ugyanazon az interfészen keresztül különböző adatbazisokhoz tudunk kapcsolódni, tehát van az általanos dolgokat tartalmazó database/sql csomag, és a gyártó specifikus működést biztosító go-sql-driver/mysql. Az én esetemben azt kellett megoldani, hogy egy felhő alapú infrastruktúra automatizációs eszköz képes legyen szolgáltató specifikus ellenőrzéseket végezni.

Első lépésben hozzunk létre egy cloud nevű packaget és definiáljuk a közös interfészt, amit meg kell valósítaniuk az egyes kiterjesztéseknek. A példában egyetlen metódus szerepel, a CloudPlugin.Validate().

type CloudPlugin interface {
Validate()
}
Ezután deklaráljunk egy map típust, ami az elérhető kiterjesztéseket gyűjti, és egy metódust amin keresztül a kiterjesztések regisztrálni tudják magukat.

type Provider string
var providers map[Provider]CloudPlugin = make(map[Provider]CloudPlugin)
func Register(provider Provider, plugin CloudPlugin) {
providers[provider] = plugin
}
Valamint írjunk pár segédfüggvényt, hogy a kliens kód is hozzáférjen a regisztrált kiterjesztésekhez.

func Providers() (resp []Provider) {
for provider := range providers {
resp = append(resp, provider)
}
return
}
func GetProvider(provider Provider) CloudPlugin {
resp := providers[provider]
return resp
}
Miután a közös résszel végeztünk, ideje egy konkrét implementaciót megvalósítani.

const OPENSTACK = cloud.Provider("OPENSTACK")
type OpenstackPlugin struct {
}
func (p *OpenstackPlugin) Validate() {
println("openstack validator")
}
Amikor a Go runtime betolt egy packaget, akkor szépen sorban meghívja a fájlokban szereplő init() metódusokat, ezt használjuk fel arra, hogy a kiterjesztés beregisztrálja magát.

func init() {
cloud.Register(OPENSTACK, new(OpenstackPlugin))
}
Nincs más dolgunk mint felhasználni a kiterjesztést, amihez szinték kihasználjuk a Go nyelv egyik adottságát. Ahogyan metódus hívásnál alulvonással helyettesithetjük azt a változót, amivel nem szeretnénk foglalkozni, az importálásnál is lehetőségünk nyílik berántani egy packaget anélkül, hogy használnánk azt.

package main
import "github.com/mhmxs/go-plugins-tutorial/cloud"
import _ "github.com/mhmxs/go-plugins-tutorial/cloud/openstack"
func main() {
for _, provider := range cloud.Providers() {
cloud.GetProvider(provider).Validate()
}
}
view raw plugin-main.go hosted with ❤ by GitHub
Az eredmény pedig:
# go run main.go
openstack validator
Egy dologról szeretnék még szót ejteni. A cloud.Register() függvényt ha közelebbről megvizsgáljuk láthatjuk, hogy egyáltalán nem szál biztos, ami a példa program esetben nem okoz gondot. Viszont ha ismeretlen forrásból érkezhet regisztráció, akkor egy Mutexel garantálnunk kell a szálbiztos működést. Ezzel a témával a Go konkurencia kezelés gyorstalpaló cikkben foglalkoztam részletesebben.
A példaprogram teljes forráskódja a hiányzó RedHat implementációval együtt megtalálható GitHubon.

2016. október 19., szerda

Go konkurencia kezelés gyorstalpaló

Mikor elkezdtem ezt a blogot írni, érdeklődésem előterében a Java alapú technológiák voltak. Rengeteg téma merült fel az eltelt 7 évben, amiről szívesen írtam volna, de mivel nem kapcsolódtak szorosan az eredeti tematikához, ezek a bejegyzések sosem kerültek implementálásra. Szakítva ezzel a (rossz) hagyománnyal szeretnék az informatika világának szélesebb spektrumával foglalkozni ezen túl. Első Java mentes bejegyzésem témájának pedig a Go nyelv konkurencia kezelését választottam.

A Go nyelv körül elég nagy a sürgés-forgás mostanában, nem tudom pontosan miért örvend ekkora népszerűségnek. Számtalan jó tulajdonsága van, de szerintem mind közül a legfontosabb szempont, hogy a nyelv egyszerű. Nincsenek mögötte "bonyolult" ideológiák mint például az objektum orientált programozásban, vagy nem lehet benne egy dolgot száz féleképpen csinálni mint mondjuk a funkcionális nyelvekben. Kutyaközönséges, de modern procedurális nyelv annak minden előnyével és hátrányával. Akit részletesebben érdekel ez a terület érdemes Rob Pike előadását meghallgatni. Témánkra rátérve ugyanez az egyszerűség jellemzi a Go konkurencia kezelését is. Vannak az un. goroutinok, ezek segítségével lehet párhozamosítani a végrehajtást, és vannak a chanelek, amiken keresztűl zajlik a kommunikáció. Utóbbiból van szinkron meg aszinkron.

package main
func main() {
c := make(chan bool)
go func() { c <- true }()
println(<-c)
}
A program létrehoz egy c szinkron chanelt, majd elindít egy goroutint, és kiolvassa a chanleből az értéket, amit a goroutin beletesz. Mivel szinkron módban működik egyik végrehajtás sem megy tovább amíg a kommunikáció meg nem történik. Aszinkron channelt a méretének megadásával tudunk létrehozni. Kész is, mindent tudunk :) Akit részletesebben érdekel, mi történik a motor-háztető alatt, annak ajánlom a Go memória kezeléséről szóló cikket. Természetesen ennyivel nem ússzuk meg a dolgot, komolyabb alkalmazásnál szükségünk van meg egy pár dologra, hogy biztosítani tudjuk programunk helyes működését. Két csomagot fogunk felhasználni, az egyik a sync, ami szinkronizációs problémákra nyújt megoldást, illetve a sync/atomic, melynek segítségével elemi műveleteket végezhetünk. A Go nyelv dokumentációját olvasva számtalan helyen olvashatjuk, hogy nincs garancia, hogy adott dolog hogyan viselkedik mikor több végrehajtó szál birizgálja egy időben (nem 1 az 1-hez a goroutine és szál leképzés, de ettől most tekintsünk el). Alapvetően érték szerint adódik át minden, így a legtöbb esetben nincs is szükség különösebb szinkronizációra, de ha mégis, magunknak kell gondoskodni róla.

Kitaláltam egy egyszerű kis feladatot, amin keresztűl bemutatok pár fontos eszközt működés közben.
  • Legyen egy szál típus, ami 1000 véletlen számot generál, és a legnagyobbat elküldi feldolgozásra.
  • Egy másik az előzőtől kapott értékből kiindulva generál még 500 véletlen számot, a legnagyobbat gyűjti egy tömbbe és tovább küldi.
  • A harmadik feldolgozó nem csinál mást, mint számolja, hogy a kapott érték hány esetben volt az utolsó a tömbben.
  • Szabályozható legyen a goroutinok száma, a generálóból hány példány futhat egyidőben, valamint hány értéket generáljon összesen.
  • Mérjük a végrehajtás idejét a generálás kezdetétől.
A feladatnak semmi értelme, és biztos egyszerűbben is meg lehetett volna oldani, de igyekeztem minél több módszert belezsúfítani a megoldásba.

Első lépésben inicializáljuk a konstansokat, valamint az eredmény tömböt. Mivel az eredménnyel több szál is fog egy időben dolgozni szükségünk lesz valami zárolási logikára. Tekintettel arra, hogy az egyik szál írja a másik pedig olvassa én a sync.RWMutex-et választottam.

package main
import (
"fmt"
"math"
"math/rand"
"sync"
"sync/atomic"
"time"
)
const (
MAX = 500 // maximum darabszám
PRODUCERS = 12 // generáló szálak
CONSUMERS = 9 // feldolgozó szálak
COUNTERS = 2 // statisztikázó szálak
)
func main() {
results := make([]int, 0) // eredmény gyűjtő
mtx := new(sync.RWMutex) // read-write lock
A következő kódblockban implementálásra kerül a statisztikázó egység.

var itsLast, notLast int32 // statisztika eredménye
countChan := make(chan int, COUNTERS) // chanel, amin a számláló várja az értékeket
counter := func() {
for {
e := <-countChan // kiolvassuk a következő értéket
mtx.RLock() // olvasásra zárolunk és kiolvassuk az eredmenyből az utolsó elemet
last := results[len(results)-1]
mtx.RUnlock()
if e != last { // eredmenytől függően elemi lépésben növeljük a megfelelő számlálót
atomic.AddInt32(&notLast, 1)
} else {
atomic.AddInt32(&itsLast, 1)
}
}
}
for i := 0; i < COUNTERS; i++ { // megadott számban elindítjuk a számlálókat
go counter()
}
Majd megírjuk a feldolgozó egységet.

appendChan := make(chan int, CONSUMERS) // chanel, amin a generálás eredményei jönek
cons := func() {
for {
e := <-appendChan // kiolvassuk a következő generált értéket
for i := 0; i < 500; i++ { // még 500szor probálkozunk nagyobbat generálni
e = int(math.Min(float64(e), float64(rand.Int())))
}
mtx.Lock() // irásra zárolunk majd hozzáfűzzük az eredméyekhez
results = append(results, e)
mtx.Unlock()
countChan <- e // tovabb küldjük az eredményt a számlálónak
}
}
for i := 0; i < CONSUMERS; i++ { // megfelelő számban elindítjuk a feldolgozót
go cons()
}
Na most kezd igazán érdekes lenni a dolog. A generátor esetében tudni kell szabályozni, hogy hány darab fut egyidőben. A legkézenfekvőbb döntés, ha létrehozunk egy chanelt adott méretben, és beleteszünk egy értéket amikor indítunk egy új generátort, és kiveszünk egyet ha végzett a generátor. Ennek eredméyeképpen amikor megtelik a chanel programunk várakozik.

poolChan := make(chan bool, PRODUCERS) // egyidőben futó generátorok nyilvántartása
prod := func() {
// ...
<-poolChan // kiolvasunk egy értéket
}
for i := 0; i < MAX; i++ { // megfelelő számban elindítjuk a generálót
poolChan <- true // beteszünk egy értéket
go prod() // elinditjuk a goroutint
}
Miután elindítottuk a generáló szálakat szeretnénk megvárni amíg az összes végez. Ennek érdekében bevezetünk egy sync.WaitGroup-ot.

prod := func(wg *sync.WaitGroup) {
defer wg.Done() // a futás végeztével nyugtázunk
// ...
}
wg := new(sync.WaitGroup) // új WaitGroup
wg.Add(MAX) // beállítjuk hány nyugtázást várunk
for i := 0; i < MAX; i++ {
// ...
go prod(wg)
}
wg.Wait() // bevárjuk az összes nyugtázást
Egy utolsó dolog maradt a generátorral kapcsolatban, hogy szeretnénk az eltelt időt mérni a lehető legközelebb a generáláshoz. Ennek biztosítására én a sync.Once-ot választottam, ami garantálja ezt a működést.

var start int64 // kezdés időbélyege
once := new(sync.Once) // ez garantálja, hogy csak egyszer adjunk értéket az időbélyegnek
poolChan := make(chan bool, PRODUCERS)
prod := func(wg *sync.WaitGroup) {
defer wg.Done()
once.Do(func() { start = time.Now().UnixNano() }) // ertéket adunk
e := rand.Int() // generálunk 1000 véletlenszámot
for i := 0; i < 999; i++ {
e = int(math.Min(float64(e), float64(rand.Int())))
}
appendChan <- e // tovább küldjük az eredményt
<-poolChan
}
wg := new(sync.WaitGroup)
wg.Add(MAX)
for i := 0; i < MAX; i++ {
poolChan <- true
go prod(wg)
}
wg.Wait()
Nincs más dolgunk mint megvárni és kiírni az eredményt.

var done bool
for !done { // várunk amíg az összeset megszámolja a számláló
done = atomic.LoadInt32(&itsLast)+atomic.LoadInt32(&notLast) == MAX
}
fmt.Printf("time spent: %dms\n", (time.Now().UnixNano()-start)/1000000)
fmt.Printf("last:not ratio: %d:%d\n", itsLast, notLast)
A teljes forráskód elérhető itt.

Programunk gyönyörűen fut, de vajon mi történik a háttérben? Fordítsuk le a programot a Go beépített verseny helyzet elemzőjével.
go build -race
Ez után futtatva valami hasonló kimenetet kapunk:
==================
WARNING: DATA RACE
Read at 0x00c42000c2e8 by main goroutine:
  main.main()
      /Users/rkovacs/asd/src/asd/main.go:92 +0x731

Previous write at 0x00c42000c2e8 by goroutine 7:
  sync/atomic.AddInt32()
      /usr/local/Cellar/go/1.7.1/libexec/src/runtime/race_amd64.s:269 +0xb
  main.main.func1()
      /Users/rkovacs/asd/src/asd/main.go:37 +0x191

Goroutine 7 (running) created at:
  main.main()
      /Users/rkovacs/asd/src/asd/main.go:42 +0x31b
==================
time spent: 1121ms
last:not ratio: 499:1
Found 1 data race(s)
Futás idejű analízisből kiderült, hogy volt értelme elemi műveletben növelni a számlálót, hiszen verseny helyzet alakult ki, és máskülönben megjósolhatatlan eredménnyel zárult volna a futás.

A téma még nem merült ki ennyivel, de remélem sikerült egy gyors képet adnom a Go konkurencia kezeléséről. Ajánlom továbbá figyelmetekbe a Concurrency is not parallelism és a Go concurrency patterns előadásokat.

2016. szeptember 30., péntek

Üzenet hitelesítése Java és Go szervizek között

Java fejlesztés mellett időm egy részét Go programozással töltöm, és egy olyan feladaton volt szerencsém dolgozni, amely mindkét platformot érintette. Napjaink modern alkalmazásai kisebb szervízekre vannak bontva, és igen gyakori, hogy az egyes szervízek eltérő technológiával kerülnek implementálásra. Konkrét esetben az volt az elvárás, hogy a szervízek közti kommunikációt aláírással hitelesítsem, a küldő fél Javaban, míg a fogadó Goban írodott. Mivel nem valami egzotikus kérést kellett megvalósítani gondoltam másoknak is hasznos lehet a megoldás. Előljáróban még annyit kell tudni a rendszer architektúrájáról, hogy a Java kód indít virtuális gépeket, és az ezeken a gépeken futó Go szolgáltatáson keresztül végez beállítási műveleteket, ráadásul mindkét komponens nyílt forráskódú. Ezen két adottságból adódóan nem volt mód sem szimetrikus titkosítást használni, vagy egyéb más érzékeny adatot eljuttatni a futó virtuális gépre, sem pedig valami közös "trükköt" alkalmazni. Maradt az aszinkron kulcspárral történő aláírás, mi az RSA-t választottuk. Nem is szaporítanám a szót, ugorjunk fejest a kódba.

Kezdjük a fogadó féllel. A Go nyelv dokumentációját olvasva hamar ráakadhatunk, hogy létezik beépített crypto/rsa csomag. Nem bővelkedik a lehetőségekben, ugyanis csak PKCS#1-et támogat. Remélem nem spoiler, de a Go lesz a szűk keresztmetszet választható sztenderdek közül. Létezik persze külső csomag pl. PKCS#8 támogatással, de mi a biztonsági kockázatát kisebbnek ítéltük a beépített bár gyengébb eljárásnak, mint a külső kevesek által auditált megoldásnak. A crypto/rsa csomagnál maradva az egyetlen lehetőségünk, hogy PSS (Probabilistic signature scheme) aláírásokat hitelesítsünk a VerifyPSS metódussal. Szóval nincs más dolgunk mint az RSA kulcspár publikus részét eljuttatni a virtuális gépre, és már mehet is a hitelesítés.

// rawSign - base64 encoded representation of the signiture
// pubPem - public key in PEM format
// data - the signed data
func CheckSignature(rawSign string, pubPem []byte, data []byte) bool {
var err error
var sign []byte
var pub interface{}
sign, err = base64.StdEncoding.DecodeString(rawSign)
if err == nil {
block, _ := pem.Decode(pubPem)
if block != nil {
pub, err = x509.ParsePKIXPublicKey(block.Bytes)
if err == nil {
newHash := crypto.SHA256.New()
newHash.Write(data)
opts := rsa.PSSOptions{SaltLength: 20} // Java default salt length
err = rsa.VerifyPSS(pub.(*rsa.PublicKey), crypto.SHA256, newHash.Sum(nil), sign, &opts)
}
}
}
return err == nil
}
view raw check-sign.go hosted with ❤ by GitHub

Küldés során a kérés teljes törzsét írtuk alá, így nincs más dolgunk mint a kérésből kibányászni a törzset és ellenőrizni a hitelességét.
func Wrap(handler func(w http.ResponseWriter, req *http.Request), signatureKey []byte) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body := new(bytes.Buffer)
defer r.Body.Close()
ioutil.ReadAll(io.TeeReader(r.Body, body))
r.Body = ioutil.NopCloser(body) // We read the body twice, we have to wrap original ReadCloser
signature := strings.TrimSpace(r.Header.Get("signature"))
if err := CheckSignature(signature, signatureKey, body.Bytes()); err != nil {
// Error handling
w.WriteHeader(http.StatusNotAcceptable)
w.Write([]byte("406 Not Acceptable"))
return
}
http.HandlerFunc(handler).ServeHTTP(w, r)
})
}
view raw wrapper.go hosted with ❤ by GitHub

Valamint implementálni és regisztrálni a kérés feldolgozót.
func PostItHandler(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("ok"))
}
func RegisterHandler() {
signature, _ := ioutil.ReadFile("/path/of/public/key")
r := mux.NewRouter()
r.Handle("/postit", Wrap(PostItHandler, signature)).Methods("POST")
http.Handle("/", r)
http.ListenAndServe("8080", nil)
}

Természetesen tesztet is írtam az aláírás ellenőrzésére.
type TestWriter struct {
header http.Header
status int
message string
}
func (w *TestWriter) Header() http.Header {
return w.header
}
func (w *TestWriter) Write(b []byte) (int, error) {
w.message = string(b)
return len(b), nil
}
func (w *TestWriter) WriteHeader(s int) {
w.status = s
}
func TestWrapAllValid(t *testing.T) {
pk, _ := rsa.GenerateKey(rand.Reader, 1024)
pubDer, _ := x509.MarshalPKIXPublicKey(&pk.PublicKey)
pubPem := pem.EncodeToMemory(&pem.Block{Type: "PUBLIC KEY", Headers: nil, Bytes: pubDer})
content := "body"
newHash := crypto.SHA256.New()
newHash.Write([]byte(content))
opts := rsa.PSSOptions{SaltLength: 20}
sign, _ := rsa.SignPSS(rand.Reader, pk, crypto.SHA256, newHash.Sum(nil), &opts)
body := bytes.NewBufferString(content)
req, _ := http.NewRequest("GET", "http://valami", body)
req.Header.Add("signature", base64.StdEncoding.EncodeToString(sign))
writer := new(TestWriter)
writer.header = req.Header
handler := Wrap(func(w http.ResponseWriter, req *http.Request) {}, pubPem)
handler.ServeHTTP(writer, req)
if writer.status != 0 {
t.Errorf("writer.status 0 == %d", writer.status)
}
}

Miután megvagyunk a hitelesítéssel jöhet az aláírás Java oldalon. Kutattam egy darabig hogyan lehet PSS aláírást Java SE-vel generálni, de mivel a projektünknek már része volt a Bouncy Castle Crypto API, így kézenfekvő volt, hogy azt használjam fel.
// privateKeyPem - private key in PEM format
// data - data to signature
public static String generateSignature(String privateKeyPem, byte[] data) {
try (PEMParser pEMParser = new PEMParser(new StringReader(clarifyPemKey(privateKeyPem)))) {
PEMKeyPair pemKeyPair = (PEMKeyPair) pEMParser.readObject();
KeyFactory factory = KeyFactory.getInstance("RSA");
X509EncodedKeySpec publicKeySpec = new X509EncodedKeySpec(pemKeyPair.getPublicKeyInfo().getEncoded());
PublicKey publicKey = factory.generatePublic(publicKeySpec);
PKCS8EncodedKeySpec privateKeySpec = new PKCS8EncodedKeySpec(pemKeyPair.getPrivateKeyInfo().getEncoded());
PrivateKey privateKey = factory.generatePrivate(privateKeySpec);
KeyPair kp = new KeyPair(publicKey, privateKey);
RSAPrivateKeySpec privKeySpec = factory.getKeySpec(kp.getPrivate(), RSAPrivateKeySpec.class);
PSSSigner signer = new PSSSigner(new RSAEngine(), new SHA256Digest(), 20); // be sure we use defautl salt lenght
signer.init(true, new RSAKeyParameters(true, privKeySpec.getModulus(), privKeySpec.getPrivateExponent()));
signer.update(data, 0, data.length);
byte[] signature = signer.generateSignature();
return BaseEncoding.base64().encode(signature);
} catch (NoSuchAlgorithmException | IOException | InvalidKeySpecException | CryptoException e) {
throw new RuntimeException(e);
}
}
private static String clarifyPemKey(String rawPem) {
return "-----BEGIN RSA PRIVATE KEY-----\n" + rawPem.replaceAll("-----(.*)-----|\n", "") + "\n-----END RSA PRIVATE KEY-----"; // PEMParser nem kedveli a sortöréseket
}

A Java oldali kulcspár generálással tele van az internet, azzal nem untatnák senkit.