2011. február 16., szerda

Skálázható, hibatűrő feldolgozások egyszerű kialakítása.

Ezt a cikket elsősorban azoknak az olvasóimnak szánom, akik még sosem használták az Oracle adatbázis beépített, Advanced Queueing (AQ) funkcionalitását, esetleg még nem is hallottak róla. Érdemes vele alapszinten megismerkedni, mert hasznos eszközként szolgálhat minket fejlesztéseink során.

Egyik projektemen a következő feladatot kellett megoldanom. Egy Oracle adatbázisba, egyszerű függvényhívásokon keresztül érkeznek be feldolgozandó feladatok, amelyeket egy különálló alkalmazás szerveren futó, java nyelven fejlesztett szoftver komponens fog végrehajtani. A feladatok végrehajtása aszinkron módon történik, a függvényhívás nem várja meg a feldolgozást, csak a feladat átadására szolgál, a feladatok eredményét egy táblába írja vissza a java program, amely eredmény aztán továbbításra kerül egy másik adatbázisba. A probléma ott kapcsolódik az Üzleti Intelligenciához, hogy a beérkező feladatok konkrét riport igények, a java komponens pedig PDF dokumentumokat generál az adattárház adattartalmából.

Ha ezt a feldolgozást szeretnénk párhuzamosítani, skálázhatóvá tenni, úgy kialakítani, hogy akár több különálló alkalmazás szerveren futó java komponens párhuzamosan végezze a feladatok feldolgozását, akkor a konkurens működést meg kell szerveznünk. Ha emellett még szeretnénk hibatűrővé tenni a megoldást, azaz gondoskodni arról, hogy ha a végrehajtás közben a java program elhal, attól még ne vesszen el a feladat, azt egy másik végrehajtó program automatikusan felvegye, akkor komolyan el kell gondolkodnunk, hogy ezt a vezérlést hogyan valósítsuk meg, mert számtalan problémát rejt magánban a feladat.

A fenti, egyébként komplex problémának az egyszerű megoldására képes az Advanced Queueing, mely könnyedén elbánik az aszinkron termelők, fogyasztók klasszikus problémájával. Ha belenézünk az „Advanced Queuing User's Guide and Reference” dokumentációba, akkor elsőre ijesztőnek tűnik a 480 oldalas leírás, azonban az alábbiakban bemutatom mennyire egyszerűen bevethető az Advanced Queueing a szoftveres megoldásunkban. Én magam is ezt az utat jártam végig, mikor először megismerkedtem az Advanced Queueing – gal, belenéztem a dokumentációba mely túl bonyolultnak tűnt számomra, aztán szerencsémre belefutottam egy angol nyelvű cikkbe, mely egy egyszerű példán keresztül megmutatta a funkció használatát.

Az Advanced Queueing arra képes, hogy egy tetszőleges objektum típus sorban állását kezelje, első lépésként tehát létre kell hoznunk egy objektum típust, mely a végrehajtandó feladatot reprezentálja. A fent felvázolt feladat megoldásához elég volt egy egyszerű objektum típust definiálni, mely csak egy kérés azonosítót tartalmaz.


create or replace type KERES_OBJECT as object
(
  KERES_ID NUMBER(12)
);

Ezt a típus felhasználva, a lenti függvényhívást kiadva létrehozunk egy táblát, mely a várakozási sorunkat fizikailag tárolni fogja az adatbázisban.


begin
  dbms_aqadm.create_queue_table(
    queue_table => 'KERESEK_AQ',
    queue_payload_type => 'KERES_OBJECT'
  );
end;

Erre a táblára alapulva létrehozunk egy várakozási sort, melynek az a tulajdonsága, hogy kétszer próbálkozik egy feladatot kiadni végrehajtásra, mielőtt a feladatot egy hibalistába helyezné át.


begin
  dbms_aqadm.create_queue(
    queue_name => 'KERES_Q',
    queue_table => 'KERESEK_AQ',
    max_retries => 2
  );
end;

Az én gyakorlati tapasztalatom az, hogy a kétszeri próbálkozás bőven elegendő, ha másodszorra sem sikerül a feladat végrehajtása, akkor valószínűleg valami logikai hiba van a programunkban, amely nem fog megoldódni az újrapróbálkozások következtében.

Annyi dolgunk van még hátra, hogy el kell indítanunk a frissen kreált sorunkat, amit az alábbi utasítással tehetünk meg.


begin
  dbms_aqadm.start_queue(queue_name => 'KERES_Q');
end;

Készen is vagyunk, létrehoztunk egy saját Advanced Queue – t, melyet használhatunk a feladatok kiosztásához. Két függvényt kell még elkészítenünk, egyet, mely egy feladatot berak a sorba és egyet, mely végrehajtásra felvesz egy feladatot a várakozási sorunkból.

Az alábbi két függvény kódját egy az egyben a megoldásomból illesztettem be ebbe a cikkbe. Az enq_keres eljárás a paraméterében megadott keres_id feladatot behelyezi a sorba, ahhoz, hogy ez ténylegesen meg is történjen ki kell adnunk egy commit – ot az eljáráshívás után.


procedure enq_keres(keres_id number) is
  l_enqueue_options     DBMS_AQ.enqueue_options_t;
  l_message_properties  DBMS_AQ.message_properties_t;
  l_message_handle      RAW(16);
begin
  dbms_aq.enqueue(
    queue_name          => 'KERES_Q',
    enqueue_options     => l_enqueue_options,    
    message_properties  => l_message_properties,  
    payload             => keres_object(keres_id),            
    msgid               => l_message_handle
  );
end;

A deq_keres függvény, amennyiben van végrehajtandó feladat a sorban, akkor kivesz egyet a feladatok közül, amennyiben nincs további feladat, akkor null értékkel tér vissza.


function deq_keres return number is
  l_dequeue_options     DBMS_AQ.dequeue_options_t;
  l_message_properties  DBMS_AQ.message_properties_t;
  l_message_handle      RAW(16);
  l_keres_object        keres_object;
 
  e_no_task EXCEPTION;
  PRAGMA EXCEPTION_INIT(e_no_task, -25228);

BEGIN
  l_dequeue_options.wait := dbms_aq.NO_WAIT;
 
  begin
    DBMS_AQ.dequeue(
      queue_name          => 'EKERES_Q',
      dequeue_options     => l_dequeue_options,
      message_properties  => l_message_properties,
      payload             => l_keres_object,
      msgid               => l_message_handle
    );
  exception
    when e_no_task then return null;
  end;

  return l_keres_object.KERES_ID;
END;

A feladat feldolgozását végző algoritmust úgy kell megvalósítanunk, hogy a deq_keres függvényhívás után csak akkor kerüljön az adott session – ben commit parancs kiadásra, ha teljes mértékben, sikeresen elkészült a feladat. Ha a session – ben nem kerül commit kiadásra, akkor az AQ gondoskodik róla, hogy egy következő függvényhívás esetén a konkrét feladat újra kiosztásra kerüljön.

A fenti alig pár sornyi kódra volt szükség ahhoz, hogy kialakítsak egy aszinkron, hibatűrő, skálázható riport kiszolgálási megoldást.

Az Advanced Queueing másik klasszikus alkalmazási területe az adattárházban a feladatok ütemezése és végrehajtása. AQ használatával lehet egyszerűen olyan ütemező algoritmust készíteni, mely egy Oracle adatbázisban, háttérben futó JOB – ok számára folyamatosan osztja ki azokat a feladatokat, melyek végrehajthatóvá váltak. Egy ilyen ütemezővel lehetőségünk van arra, hogy maximális mértékben kihasználjuk a hardver adta lehetőségeket, az teljes feldolgozási ablakunk átfutási idejét a lehető leg rövidebbre csökkentsük.

Remélem sikerült felkeltenem azok érdeklődését, akik még nem használták az Advanced Queueing által nyújtott szolgáltatásokat!

Nincsenek megjegyzések:

Megjegyzés küldése