Использование WinApi для создания пула потоков

Форум для изучающих FPC и их учителей.

Модератор: Модераторы

Re: Использование WinApi для создания пула потоков

Сообщение Python » 01.10.2018 23:24:02

Я могу сказать, что существует встроенный системный пул. Называется "порт завершения ввода-вывода". Создаётся:
Код: Выделить всё
fCompletion:=CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,Length(fThreads));

Задачи в этот порт пихаются:
Код: Выделить всё
PostQueuedCompletionStatus(fCompletion,0,0,POverlapped(Data));

А рабочий поток получает их так:
Код: Выделить всё
while(GetQueuedCompletionStatus(fCompletion,T,K,Ovr,INFINITE))do begin
  .. // выполнение реальной работы
end;

Тем не менее, несмотря на то, что у меня такой вариант вполне себе работает (в многопоточной загрузке файлов из интернета), я не могу сказать, что на 100% точно уверен, как это работает. Потому, у меня используется другой вариант, который мне более понятен:
Код: Выделить всё
// Реализует пул потоков на базе массива
// Пример функции потока (одной из пула):
// function Foo(P:pointer):integer;
// var
//   Data:PTaskInfo absolute P;
//   Obj:TObject;
// begin
//   repeat
//     Obj:=Data.Queue.Pop;
//     if Obj=nil then break;
//     ... выполняем нашу работу
//   until false;
//   Result:=0;
// end;

unit ThreadQueue;

interface

Uses
  Windows;

Type
  TThreadQueue=class;
  // это запись, которая передаётся каждому потоку в пуле
  PTaskInfo=^TTaskInfo;
  TTaskInfo=record
    ThreadInfo:pointer;
    ThreadNo:integer;
    Queue:TThreadQueue;
  end;

  // собственно, пул
  TThreadQueue=class
  private
    fThreads:array of cardinal;
    fTasks:array of TObject;
    fMaxTask:integer;
    fSemaphore,fKillEvent,fPauseEvent:cardinal;
    fRealObjs:boolean;
    fCritical:TRTLCriticalSection;
  public
    // создаёт пул. Параметры:
    // ThreadCount - количество потоков в пуле. Должно быть не меньше 2 (иначе нет смысла в пуле)
    // TaskCount - максимальное количество задач в буфере... можно ставить довольно большим, например, 1000
    // ThreadFunc - функция потока (на базе которой будут созданы все потоки пула)
    // aThreadInfo - дополнительная информация, передаваемая пулу
    // TasksAreRealObj - если установлена в true, деструктор вызовет .Free для всех задач, которые остались в буфере
    // НЕ устанавливайте его в true, если собиратесь кормить потоки фиктивными объектами, например, TObject(1)
    Constructor Create(ThreadCount,TaskCount:integer;ThreadFunc:TThreadFunc;aThreadInfo:pointer=nil;TasksAreRealObj:boolean=true);
    Destructor Destroy;override;
    // функция только для потока. Вызывайте её, чтобы получить новую задачу.
    // если задач нет - функция автоматически уходит в ожидание
    function Pop(Wait:cardinal=INFINITE):TObject;
    // функция только для вызывающего приложения. Вызывайте её, чтобы отдать пулу новую задачу.
    // НЕ пытайтесь положить в пул задачу nil.
    function Push(S:TObject):boolean;
    // останавливает выдачу задач в пул
    procedure SetPaused(Pause:boolean);
  end;

implementation

{ TThreadQueue }

Constructor TThreadQueue.Create(ThreadCount,TaskCount:integer;ThreadFunc:TThreadFunc;aThreadInfo:pointer=nil;TasksAreRealObj:boolean=true);
var
  I:integer;
  P:PTaskInfo;
  ThId:cardinal;
begin
  InitializeCriticalSection(fCritical);
  fRealObjs:=TasksAreRealObj;
  // сообщение выставляется при завершении работы
  fKillEvent:=CreateEvent(nil,true,false,nil);
  // максимальная длина очереди запросов. Сигнал, если есть сообщения.
  if ThreadCount>10 then ThreadCount:=10;
  if ThreadCount<2 then ThreadCount:=2;
  if TaskCount>1000 then fMaxTask:=1000
  else if TaskCount<ThreadCount then
    fMaxTask:=ThreadCount
  else
    fMaxTask:=TaskCount;
  // изначально, семафор обнулён, никто задания не получит
  fSemaphore:=CreateSemaphore(nil,0,fMaxTask,nil);
  // изначально пул НЕ стоит на паузе
  fPauseEvent:=CreateEvent(nil,true,true,nil);
  // генерируем потоки. Пока они могут инициализировать свои внутренние структуры,
  // а потом встанут колом на Pop, потому что задач пока нет.
  SetLength(fThreads,ThreadCount);
  For I:=Low(fThreads) to High(fThreads) do begin
    New(P);
    P^.Queue:=self;
    P^.ThreadInfo:=aThreadInfo;
    P^.ThreadNo:=I;
    fThreads[i]:=BeginThread(nil,64*1024,ThreadFunc,P,0,ThId);
  end;
end;

Destructor TThreadQueue.Destroy;
var
  LockTime,CurTime:cardinal;
  S:string;
  I:integer;
begin
   // сообщаем потокам, что надо бы добровольно откинуть копыта
  SetEvent(fKillEvent);
  // снимаемся с паузы, на случай, если это было так
  SetEvent(fPauseEvent);
  // максимальное время ожидания - 2 секунды... пока хардкод
  // в течении этого времени ВСЕ потоки должны откинуться
  LockTime:=GetTickCount+2000;
  for I:=Low(fThreads) to High(fThreads) do begin
    CurTime:=GetTickCount;
    if LockTime>CurTime then
      CurTime:=LockTime-CurTime
    else
      CurTime:=0;
    // ожидаем завершения конкретного потока
    if WaitForSingleObject(fThreads[I],CurTime)<>WAIT_OBJECT_0 then begin
       // выводим предупреждение, если поток откидываться добровольно не стал
      Str(I,S);
      OutputDebugString(PChar('Killed thread number '+S));
      TerminateThread(fThreads[I],1);
    end;
    CloseHandle(fThreads[I]);
  end;
  // типовое освобождение ресурсов
  if fRealObjs then
    For I:=Low(fTasks) to High(fTasks) do
      fTasks[i].Free;
  CloseHandle(fKillEvent);
  CloseHandle(fSemaphore);
  CloseHandle(fPauseEvent);
  DeleteCriticalSection(fCritical);
end;

procedure TThreadQueue.SetPaused(Pause:boolean);
begin
  if Pause then
    ResetEvent(fPauseEvent)
  else
    SetEvent(fPauseEvent);
end;

function TThreadQueue.Pop(Wait:cardinal=INFINITE):TObject;
var
  Events:array[0..1]of cardinal;
  I:integer;
begin
  // ждём снятия с паузы
  if WaitForSingleObject(fPauseEvent,Wait)=WAIT_TIMEOUT then begin
    Result:=nil;
    exit;
  end;
  events[0]:=fKillEvent;
  events[1]:=fSemaphore;
  // ждём снятия с семафора, что означает - есть задачи
  if WaitForMultipleObjects(2,@Events,false,Wait)<>WAIT_OBJECT_0+1 then begin
    Result:=nil;
    exit;
  end;
  // входим в критическую секцию, нам нужно отредактировать список задач
  EnterCriticalSection(fCritical);
  if Length(fTasks)<=0 then begin
    // такого быть не должно, но что-то подсказывает...
    OutputDebugString('No tasks, but sema in signal state');
    LeaveCriticalSection(fCritical);
    Result:=nil;
    exit;
  end;
  // выталкиваем первую задачу в очереди и смещаем остальные в конец
  Result:=fTasks[0];
  For I:=Low(fTasks) to High(fTasks)-1 do
    fTasks[i]:=fTasks[i+1];
  SetLength(fTasks,pred(Length(fTasks)));
  // всё
  LeaveCriticalSection(fCritical);
end;

function TThreadQueue.Push(S:TObject):boolean;
begin
  // nil добавлять нельзя, это признак смерти потока
  if S=nil then begin
    Result:=false;
    exit;
  end;
  // входим в критическую секцию, нам надо отредактировать перечень задач
  EnterCriticalSection(fCritical);
  if Length(fTasks)>=fMaxTask then begin
    LeaveCriticalSection(fCritical);
    Result:=false;
    exit;
  end;
  // добавляем задачу в конец
  SetLength(fTasks,succ(Length(fTasks)));
  fTasks[High(fTasks)]:=S;
  LeaveCriticalSection(fCritical);
  // поднимаем семафор. Если есть готовые к исполнению потоки - они тут же получат задачу
  ReleaseSemaphore(fSemaphore,1,nil);
  Result:=true;
end;

end.

Собственно, этот вариант использования в моих проектах используется чаще. Но многопоточную отработку я применяю в основном когда ограничен по вводу-выводу. Например, чтение/запись файлов, загрузка из интернета. При попытке распараллелить работу с данными я чаще встречался с проблемами и потому предпочитал работать в один поток (даже если это и не GUI поток, а отдельный, но один).
В задаче про поиск студентов по группам я вижу только один вариант параллеленья: пусть у нас 4 потока, делим весь массив на 4 части, каждую отдаём своему потоку, ждём завершения всех, результат суммируем. Какого-то пула потоков тут я не вижу. Пул требуется когда заранее число исполняемых задач неизвестно, а порождение и уничтожение потока в Windows (да и Linux, наверное) - штука довольно дорогая, потому имеет смысл держать несколько "горячих" потоков, которые будут исполнять задачи без непрерывного порождения/смерти.
Python
новенький
 
Сообщения: 16
Зарегистрирован: 23.01.2018 21:50:17

Re: Использование WinApi для создания пула потоков

Сообщение olegy123 » 02.10.2018 12:03:03

Python писал(а): порождение и уничтожение потока в Windows (да и Linux, наверное) - штука довольно дорогая
По секрету скажу, что это есть главная задача ОС. Поэтому там оптимизировано по максимуму. Сейчас на подходе ОС которые уже могут не только создавать и убивать потоки, выделять кванты времени, но также их контролировать.
Держать несколько "горячих" потоков не совсем правильно выигрыш близок к nil. Если действительно хотите хайп словить то стоит поработать с оптимизацией данных(скорость в кешах L1, с работа памятью уже очень затратная). Работа с новомодными инструкциями SSE/AVX - но они нацелены на групповые работы, соответственно данные должны быть упакованы, с разовыми данными выигрыш падает.
По любому нужно дергать флажки компилятора, Или уходить в ASM.
olegy123
энтузиаст
 
Сообщения: 1129
Зарегистрирован: 25.02.2016 12:10:20

Re: Использование WinApi для создания пула потоков

Сообщение runewalsh » 04.10.2018 12:26:13

Python
Асинхронный I/O, в частности, I/O completion ports — это не про потоки. По сути это отдельный пул выполняющихся операций ввода/вывода, которым в идеале может не соответствовать ни одного потока.
Аватара пользователя
runewalsh
постоялец
 
Сообщения: 411
Зарегистрирован: 27.04.2010 00:15:25

Пред.

Вернуться в Обучение Free Pascal

Кто сейчас на конференции

Сейчас этот форум просматривают: нет зарегистрированных пользователей и гости: 5

Рейтинг@Mail.ru