een lange titel

Labo C++11 concurrency

C++11 thread, async en andere

C++11 kent een aantal nieuwe technieken om concurrency in een programma in te bouwen.

Dit zijn een aantal voorbeelden gebaseerd op de volgende labotekst.

vb1.cpp, een eenvoudige thread

In dit voorbeeld wordt een thread gestart. Met join() kan je op het einde van de thread wachten. De naam van de uit te voeren functie wordt als een parameter aan de constructor van std::thread meegegeven. In C en ook C++ doet de naam van de functie dienst als adres van de functie.

Broncode: vb1.cpp

#include <iostream>
#include <thread>

void func(int x)
{
   std::cout << "in thread " << x << "\n";
}

int main()
{
   std::thread th(func, 100);
   th.join();

   std::cout << "thread afgelopen\n";
   return 0;
}

vb2.cpp, gemeenschappelijk geheugen schrijven

Hier worden meerdere threads gestart die elk een globale variabele wijzigen. Dit is een gevaarlijk situatie omdat er race-effecten kunnen ontstaan waardoor het eindresultaat fout kan zijn.

Al de threads worden in een vector opgeslagen zodat achteraf een join() op elke thread kan uitgevoerd worden.

Broncode: vb2.cpp

#include <iostream>
#include <vector>
#include <thread>

int accum = 0;

void square(int x)
{
   std::cout << "in thread " << x << "\n";
   accum += x*x;
}

int main()
{
   std::vector<std::thread> ths;
   for (int i=1; i<=20; i++)
   {
      ths.push_back(std::thread(square, i));
   }

   for (auto &th: ths)
   {
      th.join();
      std::cout << "thread afgelopen\n";
   }

   std::cout << "accum = " << accum << "\n";
   return 0;
}

vb3.cpp, thread en mutex

Dit is de vorige versie uitgebreid met een mutex. Hierdoor wordt het race-effect vermeden. Elke thread doet eerst een lock() voordat de globale variabele gewijzigd wordt. Achteraf gebeurt dan een unlock().

Broncode: vb3.cpp

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>

int accum = 0;
std::mutex accum_mutex;

void square(int x)
{
   int temp = x*x;
   accum_mutex.lock();
   accum += temp;
   accum_mutex.unlock();
}

int main()
{
   std::vector<std::thread> ths;
   for (int i=1; i<=20; i++)
   {
      ths.push_back(std::thread(square, i));
   }

   for (auto &th: ths)
   {
      th.join();
      std::cout << "thread afgelopen\n";
   }

   std::cout << "accum = " << accum << "\n";
   return 0;
}

vb4.cpp, thread en atomic

Deze oplossing werkt niet met een mutex maar wel met een atomic. Ook hier is de toegang beveiligd met wederzijdse uitsluiting.

Broncode: vb4.cpp

#include <iostream>
#include <vector>
#include <thread>
#include <atomic>

std::atomic<int> accum(0);

void square(int x)
{
   std::cout << "in thread " << x << "\n";
   accum += x*x;
}

int main()
{
   std::vector<std::thread> ths;
   for (int i=1; i<=20; i++)
   {
      ths.push_back(std::thread(square, i));
   }

   for (auto &th: ths)
   {
      th.join();
      std::cout << "thread afgelopen\n";
   }

   std::cout << "accum = " << accum << "\n";
   return 0;
}

vb5.cpp, async

Dit voorbeeld demonstreert het gebruik van async(). Hiermee kan je een bewerking, die een resultaat moet geven als een thread starten.

In de main() staan twee oproepen:

  • De eerste staat in commentaar, hier is het niet zeker of er wel een aparte thread gestart wordt.
  • De tweede oproep heeft als parameter een constante launch::async die aangeeft dat er meteen een thread moet gestart worden.

Met get() kan je het resultaat van de berekening opvragen.

Broncode: vb5.cpp

#include <iostream>
#include <future>
#include <chrono>

int square(int x)
{
   std::this_thread::sleep_for(std::chrono::milliseconds(100));
   return x*x;
}

int main()
{
   //auto a = std::async(square, 10);
   auto a = std::async(std::launch::async, square, 10);
   int v = a.get();

   std::cout << "v = " << v << "\n";
   return 0;
}

vb6.cpp, conditievariabele

Door een conditievariabele in te zetten kan een thread wachten op een bepaalde voorwaarde. In het voorbeeld wacht de reporter thread tot een waarde in value is ingesteld.

De assigner thread stelt de waarde in.

Door het gebruik van de conditievariabele krijg je wel polling maar deze pollingtechniek reageert snel op een wijziging van de voorwaarde zonder dat er veel CPU verbruik is.

Broncode: vb6.cpp

#include <iostream>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <queue>

std::condition_variable cond_var;
std::mutex m;

int main()
{
   int value = 10;
   bool notified = false;

   std::thread reporter([&]()
   {
      std::unique_lock<std::mutex> lock(m);
      while (!notified)
      {
          cond_var.wait(lock);
      }
      std::cout << "value is " << value << "\n";
   });

   std::thread assigner([&]()
   {
      value = 20;
      notified = true;
      cond_var.notify_one();
   });

   reporter.join();
   assigner.join();
   return 0;
}

vb7.cpp, consumer-producer probleem

Dit is een klassiek probleem dat met een conditievariabele is opgelost. De conditievariabele en mutex maken nu deel uit van de klasse Goods. Ze zijn daar datamembers. Er is ook een datamember q, dit is de wachtrij.

Er zijn twee methoden:

  • push()

    Deze methode plaatst een element op de wachtrij. Er hoeft niet gewacht te worden. Met notify_one() wordt de andere thread gewekt.

  • pop()

    Deze methode wacht op een element. Als de wachtrij leeg is, moet er gewacht worden. Dit gebeurt met wait().

Naast de conditievariabele is er bij dit mechanisme nog een mutex. Deze mutex zit verpakt in een lock_guard variabele.

{
   std::lock_guard<std::mutex> lock(m);
   q.push(x);
}

Deze variabele staat in een eigen {} block. Hierdoor lopen automatisch de constructor en de destructor van de lock_guard. Deze zorgen respectievelijk voor de lock() en unlock van de mutex.

In de main() functie worden de producer en consumer elk als een thread gestart.

Broncode: vb7.cpp

#include <iostream>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <queue>

class Goods
{
private:
   std::condition_variable cond_var;
   std::mutex              m;
   std::queue<int>         q;

public:
   Goods()
   {
   }
   void push(int x)
   {
      {
         std::lock_guard<std::mutex> lock(m);
         q.push(x);
      }
      cond_var.notify_one();
   }
   int pop()
   {
      std::unique_lock<std::mutex> lock(m);
      while (q.empty())
      {
         cond_var.wait(lock);
      }
      int v = q.front();
      q.pop();
      return v;
   }
};

int main()
{
   int n = 20;
   Goods goods;

   std::thread producer([&]()
   {
      for (int i=0; i<n; i++)
      {
         std::cout << "i push " << i << "\n";
         goods.push(i);
      }
   });

   std::thread consumer([&]()
   {
      for (int i=0; i<n; i++)
      {
         int v = goods.pop();
         std::cout << "v pop " << v << "\n";
      }
   });

   producer.join();
   consumer.join();
   return 0;
}

vb8.cpp promise-future

In dit voorbeeld worden promise en future gedemonstreerd. Deze constructie kan je gebruiken om een resultaat van een asynchroon lopende bewerking op te vragen. Het resultaat wordt na de berekening met set_value() in de promise geplaatst. De promise is een globale variabele die als een parameter aan de lambdafunctie wordt meegegeven.

Met get_future() wordt eerst een future opgehaald en in deze future kan met get() het uiteindelijke resultaat opgevraagd worden.

Broncode: vb8.cpp

#include <iostream>
#include <future>


int main()
{
   std::promise<int> prms;

   auto th = std::thread(
      [](std::promise<int> &prms, int x) -> int 
      { 
         prms.set_value(2*x); 
      },
      std::ref(prms), 15);

   th.join();
   auto ftr = prms.get_future();
   int r = ftr.get();

   std::cout << "r " << r << "\n";

   return 0;
}

vb9.cpp async en future

In dit voorbeeld is de promise/future verborgen in een async. Deze async geeft meteen een future terug. Dit is de variabele ftr. Met deze future kan je het resultaat ophalen. Hiervoor moet je de get() methode gebruiken.

Broncode: vb9.cpp

#include <iostream>
#include <future>

int main()
{
   auto ftr = std::async(
      [](int x) -> int 
      { 
         return 2*x; 
      },
      15);

   int r = ftr.get();
   std::cout << "r " << r << "\n";

   return 0;
}

Besluit

Met de nieuwe C++11 constructies wordt multithreading mogelijk in C++ in een stijl die aansluit met de recentere vernieuwingen zoals lambdafuncties. Deze laatste zorgen ervoor dat ook het maken van threads op een eenvoudige en leesbare manier kan geschreven worden in C++.