Максимальное разумное количество работающих потоков .

Вопросы программирования и использования среды Lazarus.

Модератор: Модераторы

Re: Максимальное разумное количество работающих потоков .

Сообщение Alex2013 » 13.04.2025 22:34:55

Все три фазы в многопоточном режиме !
Код: Выделить всё
Тест загрузки списка 3
MT Mode Попытки 1
Время 0,38 c
Время 0,44 c
Время 1,29 c


Для сравнения "Однопоточный режим"
Код: Выделить всё
Тест загрузки списка 3
Время 0,86 c
Время 0,93 c
Время 3,54 c

(Правда сделал "по своему" и в последней фазе немного схитрил (отказался от ожидания) так что крайний "рекорд"(общее время загрузки 30-ти картинок 1,29 c ) малость сомнительный (общее время "с ожиданием" примерно тоже что и у однопоточного режима отличия в пределах погрешности измерений ) ) Но тем не менее небольшой выигрыш есть . (впрочем это касается именно "трёхфазной " загрузки сравнения с "однофазными" заметно веселее )

Код: Выделить всё
Тест загрузки списка 2
Однопоточный
Время 10,63 c i 29 c 30 
Тест загрузки списка 2
Многопоточный
Время 8,58 c i 28 c 30 

(Если увеличить количество потоков результат будет лучше но ценой заметного тормоза программы и всей системы )

Малый "стресс тест " ( 84 файла)
Код: Выделить всё
Тест загрузки списка 3
MT Mode Попытки 1
Время 0,08 c
Время 0,22 c
Время 10,14 c
Тест загрузки списка 2
Многопоточный
Время 22,15 c  84 ф
(Большой стресс-тест "MT Mode 3.0" пока не проходит (но думаю я знаю причину - нет запуска потоков " по частям" (как это уже сделано в "MT Mode 2.0"))
Последний раз редактировалось Alex2013 17.04.2025 23:19:49, всего редактировалось 2 раз(а).
Alex2013
долгожитель
 
Сообщения: 3140
Зарегистрирован: 03.04.2013 11:59:44

Re: Максимальное разумное количество работающих потоков .

Сообщение sts » 15.04.2025 14:19:13

надо еще проверить файлмаппинг на память, самый быстрый доступ к содержимому файла
sts
постоялец
 
Сообщения: 451
Зарегистрирован: 04.04.2008 12:15:44
Откуда: Тольятти

Re: Максимальное разумное количество работающих потоков .

Сообщение Alex2013 » 17.04.2025 22:51:31

sts писал(а):надо еще проверить файлмаппинг на память, самый быстрый доступ к содержимому файла

Это мысль ! :idea:
Но у меня сейчас "красивая теория убита безобразным опытом" ...
Там где "трехфазный" однопоточный код "работает как часы" многопоточный упирается рогом на всех трех фазах.
(Поочереди отключал МП-режим для каждой но "большой стресс-тест" (около гигабайта файлов) убивает все )
И органичнее количества активных потоков к сожалению не помогает...
Зы
"Спросил у ясеня" (тобишь у ИИ ) и он предсказуемо "забросал листвой"...
Но кое-что полезное точно есть, например мануал по завершению "зависших" потоков.
Код: Выделить всё
Как безопасно завершить "повисший" поток в Lazarus/Free Pascal
Завершение зависшего потока — сложная задача, потому что принудительное убийство (TerminateThread) может привести к утечкам памяти, повреждению данных или непредсказуемым ошибкам.
Но есть безопасные стратегии для корректного завершения:

1. Мягкое завершение (рекомендуется)
Использование Terminate + проверка флага в потоке
Поток должен регулярно проверять Terminated и завершаться сам.
Пример:
pascal
Copy
type
  TMyThread = class(TThread)
  protected
    procedure Execute; override;
  end;

procedure TMyThread.Execute;
begin
  while not Terminated do // Проверяем флаг завершения
  begin
    // Рабочий код...
    if SomeCondition then Break; // Выход по условию
    Sleep(10); // Даём потоку "передышку"
  end;
end;
Как завершить:
pascal
Copy
MyThread.Terminate;  // Устанавливает флаг Terminated
MyThread.WaitFor;    // Ждём завершения (можно с таймаутом)
Плюсы:
? Безопасно
? Нет утечек ресурсов
Минусы:
? Если поток "завис" в внешнем вызове (например, HTTP-запрос), флаг Terminated не поможет.

2. Принудительное завершение (опасно, но иногда необходимо)
Вариант 1: TerminateThread (Windows API)
pascal
Copy
uses
  Windows;

// ОПАСНО! Может вызвать утечки памяти.
TerminateThread(MyThread.Handle, 0);
Проблемы:
    • Может оставить заблокированные мьютексы/критические секции.
    • Не освобождает память и хендлы.
Вариант 2: Через TThread.FreeOnTerminate
Если поток завершится аварийно, но FreeOnTerminate := True, он автоматически уничтожится.
pascal
Copy
MyThread.FreeOnTerminate := True;
MyThread.Terminate;
Но: Это не гарантирует немедленного завершения.

3. Гибридный подход (таймаут + принудительное завершение)
Шаг 1: Мягкое завершение (Terminate + WaitFor)
pascal
Copy
MyThread.Terminate;
if MyThread.WaitFor(5000) = wrTimeout then // Ждём 5 сек
begin
  // Поток не ответил, завершаем принудительно
  TerminateThread(MyThread.Handle, 0);
  MyThread.Free;
end;
Шаг 2: Защита ресурсов
Если поток использует критические секции, файлы или соединения, добавьте try..finally:
pascal
Copy
procedure TMyThread.Execute;
begin
  try
    while not Terminated do
    begin
      // Код...
    end;
  finally
    CloseHandle(SomeResource); // Освобождаем ресурсы
  end;
end;

4. Альтернатива: Изоляция потоков (лучшая практика)
Если поток выполняет HTTP-запросы, чтение файлов или другие долгие операции, вынесите их в отдельный процесс (например, через TProcess).
pascal
Copy
uses
  Process;

var
  Proc: TProcess;
begin
  Proc := TProcess.Create(nil);
  try
    Proc.Executable := 'my_helper_tool.exe';
    Proc.Options := [poNoConsole];
    Proc.Execute;
   
    // Ждём завершения с таймаутом
    if Proc.WaitForExit(5000) then
      Memo1.Lines.Add('Успешно завершено')
    else
    begin
      Proc.Terminate(0); // SIGTERM (мягкое завершение)
      Sleep(1000);
      if Proc.Running then
        Proc.Kill(1);    // SIGKILL (принудительное)
    end;
  finally
    Proc.Free;
  end;
end;
Плюсы:
✔ Процессы изолированы, краш одного не влияет на основной поток.
✔ Можно убить без утечек в основном приложении.

Итоговые рекомендации

Код: Выделить всё
Способ                   Безопасность          Применимость
----------------------------------------------------------------------------------------
Terminate + WaitFor   ✅ Высокая             Лучший вариант для чистого кода
TerminateThread       ❌ Опасен              Только в крайних случаях!
Изоляция в TProcess   ✅ Максимальная        Для HTTP/долгих операций


Лучший вариант:
1. Проверяйте Terminated в потоке в долгих циклах.
2. Используйте WaitFor с таймаутом.
3. Если поток не отвечает – попробуйте закрыть его ресурсы и только потом TerminateThread.
4. Для HTTP/опасных операций – выносите в TProcess. Если сделаете так, зависание потоков будет минимальным!

Краткий чек-лист для безопасного завершения потоков
1. Всегда проверяйте Terminated в долгих операциях.
2. Используйте WaitFor(таймаут) вместо бесконечного ожидания.
3. Закрывайте ресурсы (файлы, соединения) в finally.
4. Избегайте TerminateThread — только если поток реально завис и не реагирует.
5. Для HTTP/опасных задач — лучше TProcess.
Alex2013
долгожитель
 
Сообщения: 3140
Зарегистрирован: 03.04.2013 11:59:44

Re: Максимальное разумное количество работающих потоков .

Сообщение Alex2013 » 19.04.2025 09:33:17

Баг победил... Но есть одна беда "победил" перебором вариантов... и главное не понял почему я его победили ...
1 Более мнение понятно Оказалось что нужно ограничивать количество СОЗДАННЫХ но незпущенных потоков. (Хотя странно, по идее созданный но незапущенный поток просто обычный объект/класс в памяти )
2 Непонятно совсем.
Если передача параметров происходит через конструктор все в порядке, а если конструктор создает поток в остановленном состоянии и я просто записываю данные прямо в поля потока, а потом запускаю нет. (Причем "глюки" идут какие-то "плавающие" то работает то нет )

ЗЫ
При этом если создавать поток в остановленном состоянии, и сразу запускать БЕЗ "ПРЯМОЙ" ПЕРЕДАЧИ ПАРАМЕТРОВ все в порядке.
Код: Выделить всё
For I:=0 to C-1 do begin
T:= My_Thread.Create( True,....);
T.Start;
  Run_Test;// тут ограничение  количества одновременно работающих потоков
end;
Alex2013
долгожитель
 
Сообщения: 3140
Зарегистрирован: 03.04.2013 11:59:44

Re: Максимальное разумное количество работающих потоков .

Сообщение Alex2013 » 07.05.2025 04:03:54

xchgeaxeax писал(а):Может лучше так?
Код: Выделить всё
type
  PLoadableFunc = function (ASender: TObject; out AFileName: String): Boolean;
  PLoadableFileRecord = ^TLoadableFileRecord;
  TLoadableFileRecord = packed record
    fName: String;
    fData: TMemoryStream;
    Fail: LongInt; // -1 файл прочитан, 0 - ошибка чтения файла
  end;
  TLoadableFileThread = class(TThread)
  private
    FLoad: TList;
    FName: String;
    FNext: PLoadableFunc;
    FWork: Boolean;
    function GetCount: LongInt;
    function GetData(Index: LongInt): PLoadableFileRecord;
    procedure GetNext;
  protected
    procedure Execute; override;
  public
    constructor Create(fnGetNextFileName: PLoadableFunc);
    destructor Destroy; override;
    property Count: LongInt read GetCount;
    property Data[Index: LongInt]: PLoadableFileRecord read GetData;
  end;

constructor TLoadableFileThread.Create(fnGetNextFileName: PLoadableFunc);
begin
  inherited Create(False);
  FreeOnTerminate := True;
  FList := nil;
  FName := EmptyStr;
  FNext := fnGetNextFileName;
  FWork := False;
end;

function TLoadableFileThread.GetCount: LongInt;
begin
  Result := FList.Count;
end;

function TLoadableFileThread.GetData(Index: LongInt): PLoadableFileRecord;
begin
  Result := FList[Index]; // Можете конечно проверить дополнительно Index но это и так ни к чему. Так и так получите Exception, если такого Index нет.
end;

procedure TLoadableFileThread.GetNext;
begin
  FWork := FNext(Self, FName); // Синхронно вызываем, чтобы получить новое имя, а если имен нет, тогда и этого же вызова и забираем дынные...
end;

procedure TLoadableFileThread.Execute;
var
  AFile: String;
  PFile: PLoadableFileRecord;
  i: LongInt;
begin
  FList := TList.Create;
  while not Terminated do begin
    Synchronize(@GetNext);
    if FWork then begin
      New(PFile);
      with PFile^ do begin
        fName := AFile;
        fData := TMemoryStream.Create;
        Fail := 10; // 10 попыток прочитать, если нужно.
        while Fail > 0 do begin try
          fData.LoadFromFile(AFile);
          Fail := -1;
        except
          dec(Fail);
        end;
      end;
      FList.Add(PFile);
    end else Sleep(1000); // Не важно сколько. Просто ветка должна подождать до её завершения т.к. файлов больше не будет
  end;
  for i := 0 to FList.Count - 1 do with PLoadableFileRecord(FList[i])^ do begin
    FreeAndNil(fData);
    Dispose(PLoadableFileRecord(FList[i]));
  end;
  FreeAndNil(FList);
end;

var
  aLoadders: array [0 .. 3] of TLoadableFileRecord;
  aFileNames: TStringList;

function EnumerateList(ASender: TObject; out AFileName: String): Boolean;
var
  S: String;
begin
  S := aFileNames.Pop();
  Result := (aFileNames.Count >= 0) and (Trim(S) <> EmptyStr);
  AFileName := S;
  if not Result then with ASender as TLoadableFileThread do begin
    // Тут можно безопасно брать результаты работы ветви
    // Нет необходимости копировать в дополнительный TMemoryStream, а можно сразу запускать преобразование из JPEG в BMP
    for i := 0 to Count - 1 do with Data[i]^ do begin
     
    end;
    Terminate; // Ну а дальше ветка умрет через 1 сек и почистит за собой память
  end;
end;

begin
  aFileNames := TStringList.Create;
  aFileNames.LoadFromFile('FilesToLoad.txt');
  for i := Low(aLoadders) to High(aLoadders) do aLoadders[i] := TLoadableFileThread.Create(@EnumerateList);
  for i := 0 to 999999 do Sleep(100); // Что-то делаем, пока загружается
end.


PS печатал просто в браузере. Если есть очепятки, тогда поправьте. Но думаю принцип вы поймете.


Ради "завершения тестирования " проверил этот код (причем как почти "исходный" ( с моими правками) так и дописанный с помощью DeepSeek ). результат " куда-то грузит" но обработка и освобождение "в пролете" ... ( пишет "завешаю поток 0 " и зацикливается ) Вообщем если есть рабочая версия этого кода интересно на неё взглянуть. (Основные идеи понятны написать аналог пожалуй смогу но интересно понять где я напортачил в правках и адаптации этого варианта (есть несколько мыслей которые можно проверить но все это уже малость утомляет ) ) :idea:
Alex2013
долгожитель
 
Сообщения: 3140
Зарегистрирован: 03.04.2013 11:59:44

Re: Максимальное разумное количество работающих потоков .

Сообщение xchgeaxeax » 07.05.2025 11:21:23

Код: Выделить всё
program FileLoader;

{$mode objfpc}{$H+}

uses
  {$IFDEF UNIX}
  cthreads,
  {$ENDIF}
  {$IFDEF HASAMIGA}
  athreads,
  {$ENDIF}
  Classes, SysUtils;

const
  iThreadWaitTimeout = 1000;

type
  PLoadableFunc = function (ASender: TObject; out AFileName: String): Boolean;
  PLoadableFileRecord = ^TLoadableFileRecord;
  TLoadableFileRecord = packed record
    fName: String;
    fData: TMemoryStream;
    Fail: LongInt; // -1 файл прочитан, 0 - ошибка чтения файла
  end;

  { TLoadableFileThread }

  TLoadableFileThread = class(TThread)
  private
    FFile: String;
    FLoad: TList;
    FNext: PLoadableFunc;
    FText: String;
    FTick: Int64;
    FWork: Boolean;
    function GetCount: LongInt;
    function GetData(Index: LongInt): PLoadableFileRecord;
    procedure GetNext;
    function GetText: String;
  protected
    procedure Execute; override;
  public
    constructor Create(const AName: String; fnGetNextFileName: PLoadableFunc);
    destructor Destroy; override;
    property Count: LongInt read GetCount;
    property Data[Index: LongInt]: PLoadableFileRecord read GetData;
    property Name: String read GetText;
    property Tick: Int64 read FTick write FTick;
  end;

constructor TLoadableFileThread.Create(const AName: String;
  fnGetNextFileName: PLoadableFunc);
begin
  inherited Create(True);
  FreeOnTerminate := False;
  FFile := EmptyStr;
  FLoad := nil;
  FNext := fnGetNextFileName;
  FText := AName;
  FTick := 0;
  FWork := False;
end;

destructor TLoadableFileThread.Destroy;
begin
  if not Terminated then Terminate;
  inherited Destroy;
end;

function TLoadableFileThread.GetCount: LongInt;
begin
  Result := FLoad.Count;
end;

function TLoadableFileThread.GetData(Index: LongInt): PLoadableFileRecord;
begin
  Result := FLoad[Index]; // Можете конечно проверить дополнительно Index но это
                          // и так ни к чему. Так и так получите Exception, если
                          // такого Index нет.
end;

procedure TLoadableFileThread.GetNext;
begin
  FWork := FNext(Self, FFile); // Синхронно вызываем, чтобы получить новое имя,
                               // а если имен нет, тогда и этого же вызова и
                               // забираем дынные...
end;

function TLoadableFileThread.GetText: String;
begin
  Result := FText;
end;

procedure TLoadableFileThread.Execute;
var
  PFile: PLoadableFileRecord;
  i: LongInt;
begin
  FLoad := TList.Create;
  while not Terminated do begin
    { Synchronize для программы с графическим интерфейсом в главной нити }
    GetNext; // Synchronize(@GetNext);
    if FWork then begin
      WriteLn(FText, ': загружаю файл ' + FFile);
      New(PFile);
      with PFile^ do begin
        fName := FFile;
        fData := TMemoryStream.Create;
        Fail := 10; // 10 попыток прочитать, если нужно.
        while Fail > 0 do try
          fData.LoadFromFile(fName);
          Fail := -1;
        except
          dec(Fail);
        end;
        if Fail = 0 then WriteLn(FText, ': файл не загружен ' + FFile)
                    else WriteLn(FText, ': файл успешно загружен ' + FFile);
      end;
      FLoad.Add(PFile);
    end else Sleep(iThreadWaitTimeout); // Не важно сколько. Просто ветка должна
                                        // подождать до её завершения т.к.
                                        // файлов больше не будет
  end;
  WriteLn(FText, ': начало очистки памяти перед завершением нити...');
  for i := 0 to FLoad.Count - 1 do with PLoadableFileRecord(FLoad[i])^ do begin
    FreeAndNil(fData);
    Dispose(PLoadableFileRecord(FLoad[i]));
  end;
  FreeAndNil(FLoad);
  FreeOnTerminate := True;
  WriteLn(FText, ': нить завершена.');
end;

var
  aFileNames: TStringList;
  aMemoryList: TMemoryStream;
  x: TRTLCriticalSection;
  aStartTickCount: Int64;

function EnumerateList(ASender: TObject; out AFileName: String): Boolean;
var
  S: String;
  i: LongInt;
  z: Int64;
begin
  (ASender as TLoadableFileThread).Tick := GetTickCount64 - aStartTickCount;
  EnterCriticalSection(x);
  S := aFileNames.Pop();
  Result := (aFileNames.Count >= 0) and (Trim(S) <> EmptyStr);
  AFileName := S;
  with ASender as TLoadableFileThread do if not Result then begin
    // Тут можно безопасно брать результаты работы ветви
    // Нет необходимости копировать в дополнительный TMemoryStream, а можно
    // сразу запускать преобразование из JPEG в BMP
    WriteLn(Name, ': больше нет файлов для загрузки...');
    z := aMemoryList.Size;
    for i := 0 to Count - 1 do with Data[i]^ do if Fail = -1 then begin
      WriteLn(Name, ': передаю на обработку загруженные данные из файла ' + fName);
      fData.Position := 0;
      aMemoryList.CopyFrom(fData, fData.Size);
    end;
    WriteLn(Name, ': загрузила ', aMemoryList.Size - z, ' байт.');
    WriteLn(Name, ': Время ', GetTickCount64 - aStartTickCount, ' мс (без ожидания в CriticalSection = ', Tick, ')');
    Terminate; // Ну а дальше ветка умрет через 1 сек и почистит за собой память
  end;
  LeaveCriticalSection(x);
end;

procedure StartLoading(const AFileName: String);
var
  aLoadders: array [0 .. 3] of TLoadableFileThread;
  i: LongInt;
begin
  aStartTickCount := GetTickCount64;
  aFileNames.LoadFromFile(AFileName);
  for i := Low(aLoadders) to High(aLoadders) do begin
    aLoadders[i] := TLoadableFileThread.Create('Thread' + i.ToString, @EnumerateList);
    aLoadders[i].Start;
  end;
end;

begin
  InitCriticalSection(x);
  aMemoryList := TMemoryStream.Create;
  aFileNames := TStringList.Create;
  StartLoading('FilesToLoad.txt');
  while aFileNames.Count > 0 do Sleep(100); // Что-то делаем, пока загружается
  Sleep(iThreadWaitTimeout + 1000);
  WriteLn('All done.');
  ReadLn;
  FreeAndNil(aFileNames);
  FreeAndNil(aMemoryList);
  DoneCriticalSection(x);
end.
Вот. Я оформил как обычную программу и исправил опечатки, что допустил при наборе в браузере. Проверил у себя. Можете тестировать.

Код: Выделить всё
...
Thread2: файл успешно загружен /home/xchgeaxeax/Картинки/1102.jpg
Thread2: загружаю файл /home/xchgeaxeax/Картинки/1
Thread3: файл успешно загружен /home/xchgeaxeax/Картинки/173357802127397918_c0dfd993_460x816.mp4
Thread3: больше нет файлов для загрузки...
Thread3: передаю на обработку загруженные данные из файла /home/xchgeaxeax/Картинки/рождество.jpg
Thread3: передаю на обработку загруженные данные из файла /home/xchgeaxeax/Картинки/понедельник-пятница.jpg
Thread0: файл не загружен /home/xchgeaxeax/Картинки/Frontier Developments
Thread1: файл не загружен /home/xchgeaxeax/Картинки/Criterion Games
Thread3: передаю на обработку загруженные данные из файла /home/xchgeaxeax/Картинки/озоновая_дыра.jpg
Thread3: передаю на обработку загруженные данные из файла /home/xchgeaxeax/Картинки/нервная система.jpg
Thread3: передаю на обработку загруженные данные из файла /home/xchgeaxeax/Картинки/направление.jpg
Thread3: передаю на обработку загруженные данные из файла /home/xchgeaxeax/Картинки/модули.png
Thread2: файл не загружен /home/xchgeaxeax/Картинки/1
...
Thread3: передаю на обработку загруженные данные из файла /home/xchgeaxeax/Картинки/173357802127397918_c0dfd993_460x816.mp4
Thread3: загрузила 19313402 байт.
Thread3: Время 1069 мс (без ожидания в CriticalSection = 904)
Thread1: больше нет файлов для загрузки...
...
Thread1: загрузила 19301123 байт.
Thread1: Время 1107 мс (без ожидания в CriticalSection = 953)
Thread0: больше нет файлов для загрузки...
...
Thread0: загрузила 20764068 байт.
Thread0: Время 1151 мс (без ожидания в CriticalSection = 953)
Thread2: больше нет файлов для загрузки..
...
Thread2: передаю на обработку загруженные данные из файла /home/xchgeaxeax/Картинки/1111.jpg
Thread2: передаю на обработку загруженные данные из файла /home/xchgeaxeax/Картинки/1102.jpg
Thread2: загрузила 16210926 байт.
Thread2: Время 1180 мс (без ожидания в CriticalSection = 1026)
Thread3: начало очистки памяти перед завершением нити...
Thread3: нить завершена.
Thread1: начало очистки памяти перед завершением нити...
Thread1: нить завершена.
Thread0: начало очистки памяти перед завершением нити...
Thread0: нить завершена.
Thread2: начало очистки памяти перед завершением нити...
Thread2: нить завершена.
All done.
Но я тут выделяю память по необходимости. Скорее всего будет быстрее, если посчитать размер и выделить буфер памяти сразу ну и загружать в него без дополнительного копирования забирая куски буфера вместе с именем файла. Но тогда надо будет список файлов дополнить их длиной.
xchgeaxeax
постоялец
 
Сообщения: 187
Зарегистрирован: 11.05.2023 03:51:40

Re: Максимальное разумное количество работающих потоков .

Сообщение Alex2013 » 07.05.2025 20:19:33

Спасибо ! :idea: ( я вроде разобрался, функция EnumerateList в моей версии объявлена внутри обработчика кнопки и это путает стек при внешнем вызове несмотря на явное объявление far ну и деструктор в потоке неверно дописал Но оригинальная авторская версия всё равно наверняка лучше будет, так что непременно попробую «адаптировать» её к своему «тестовому стенду». )
Alex2013
долгожитель
 
Сообщения: 3140
Зарегистрирован: 03.04.2013 11:59:44

Re: Максимальное разумное количество работающих потоков .

Сообщение Alex2013 » 10.05.2025 23:17:14

Чуть не спятил при отладке но код запустил ( теперь его нужно почистить и оптимизировать )

"Жит парад" методов загрузки... ["Средний тест" 88 картинок ]
Код: Выделить всё
------------------------------------------------------------------------------------------------
Тест загрузки списка 1
Однопоточный
Время 59,27 c

Тест загрузки списка 1
Многопоточный
Время 16,92 c

Тест загрузки списка 2
Однопоточный
Время 61,40 c i 87 c 88 
Тест загрузки списка 2
Многопоточный
Время 17,03 c i 85 c 88 

Тест загрузки списка 3
Фаза 0 Однопоточный  режим
Время 0,05 c
Время 0,28 c
Время 16,03 c
Тест загрузки списка 3
Лог запуска по частям
39 40
78 39

Время 0,21 c
Фаза 1 MT Mode
Лог
39 40
78 39

Время 0,60 c
Лог
39 40
78 39
Время 11,18 c

!! NEW !! =====================================!

Тест загрузки списка 4
Запуск загрузки...
Поток 8272 завершает работу
C 21
Обработка 21 загруженных изображений...
Поток 9404 завершает работу
C 21
Обработка 21 загруженных изображений...
Поток 12244 завершает работу
C 23
Обработка 23 загруженных изображений...
Поток 9484 завершает работу
C 23
Обработка 23 загруженных изображений...
Время 20,33 c
Alex2013
долгожитель
 
Сообщения: 3140
Зарегистрирован: 03.04.2013 11:59:44

Re: Максимальное разумное количество работающих потоков .

Сообщение xchgeaxeax » 11.05.2025 00:08:34

Вместо TStringList.pop() можно использовать индекс, который должен достигнуть TStringList.Count, чтобы все это завершило работу. Так не придется удалять строки из списка и, возможно, можно будет отказаться от долгой синхронизации.

Код: Выделить всё
var
  aFileNames: TStringList;
  iFileNames: LongInt = 0; // Не забываем устанавливать этот 0 перед запуском потоков

function EnumerateList(ASender: TObject; out AFileName: String): Boolean;
  function PostInc(var aValue: LongInt): LongInt; inline;
  begin
    Result := aValue; inc(aValue);
  end;
var
  S: String;
  i: LongInt;
  z: Int64;
begin
  (ASender as TLoadableFileThread).Tick := GetTickCount64 - aStartTickCount;
  EnterCriticalSection(x);
  Result := iFileNames < aFileNames.Count;
  if Result then begin
    S := aFileNames[PostInc(iFileNames)];
    Result := (aFileNames.Count >= 0) and (Trim(S) <> EmptyStr);
  end;
  with ASender as TLoadableFileThread do if not Result then begin
    // Тут можно безопасно брать результаты работы ветви
    // Нет необходимости копировать в дополнительный TMemoryStream, а можно
    // сразу запускать преобразование из JPEG в BMP
    WriteLn(Name, ': больше нет файлов для загрузки...');
    z := aMemoryList.Size;
    for i := 0 to Count - 1 do with Data[i]^ do if Fail = -1 then begin
      WriteLn(Name, ': передаю на обработку загруженные данные из файла ' + fName);
      fData.Position := 0;
      aMemoryList.CopyFrom(fData, fData.Size);
    end else AFileName := S;
    WriteLn(Name, ': загрузила ', aMemoryList.Size - z, ' байт.');
    WriteLn(Name, ': Время ', GetTickCount64 - aStartTickCount, ' мс (без ожидания в CriticalSection = ', Tick, ')');
    Terminate; // Ну а дальше ветка умрет через 1 сек и почистит за собой память
  end;
  LeaveCriticalSection(x);
end;


Добавлено спустя 7 минут 30 секунд:
Но, думаю, еще лучше было бы сразу попытаться раскидать по ветвям файлы с учетом их длин. Чтобы каждая ветвь грузила примерно равное число байт. Именно эта идея лежала в основе жадного алгоритма для получения мен файлов. Хотя они и не упорядочиваются по размеру, но ветви не будут брать еще одно имя из списка пока не загрузят текущий файл. Таким образом пускай и с погрешностью, но размеры файлов распределятся более-менее равномерно. Хотя, возможен вариант, при котором будет в конце списка один большой файл на 2 Гб, а все файлы перед ним по 60-100 кб. Тогда одна ветвь явно будет работать дольше остальных. Но и даже сортировка тут мало чем поможет. Разве что переставит его в начало и он займет загрузкой один поток сразу, а не после загрузки всей мелочевки.

Добавлено спустя 25 минут 25 секунд:
С другой стороны. Если и сортировать файлы, то не просто по объему, но и по примерному времени загрузки. Т.е. желательно учесть скорость чтения файлов с устройства хранения. А скорости могут сильно отличаться. Для файлов с SSD.M2 и обычного HDD скорости явно будут не сравнимы (не говоря уже о сетевых файлах и просто забывая про файлы с НГМД).
xchgeaxeax
постоялец
 
Сообщения: 187
Зарегистрирован: 11.05.2023 03:51:40

Re: Максимальное разумное количество работающих потоков .

Сообщение Alex2013 » 11.05.2025 14:32:06

Моя урезанная и "адаптированная" под GUI версия "Свойства "(property) вещь красивая но головняка добавила изрядно.
(Попытка писать поле-property в мемо иногда приводит к ну очень странный эффектам )
Поток...
Код: Выделить всё
const
  iThreadWaitTimeout = 1000;
var
  aFileNames: TStringList;
   aFileNamesInd: TStringList;//дубль для получения индекса (Через IndexOf ).
(Индекс в исходном списке  нужен для отрисовки "элементов  мозаики" в  правильном порядке   )

type
  PLoadableFunc = function (ASender: TObject; out AFileName: String): Boolean;
  PLoadableFileRecord = ^TLoadableFileRecord;
  TLoadableFileRecord = packed record
    fName: String;
    fData: TMemoryStream;
    Ind,Fail: LongInt; // -1 файл прочитан, 0 - ошибка чтения файла
    FromInternet: Boolean;
  end;


{ TLoadableFileThread }

TLoadableFileThread = class(TThread)
private
  FFile: String;
  fName: String;
  FList: TList;
  FNext: PLoadableFunc;
  FWork: Boolean;
  procedure GetNext;

protected
  procedure Execute; override;
public

  constructor Create( fnGetNextFileName: PLoadableFunc);
  destructor Destroy; override;
  function LoadFromInternet(const AURL: String; AStream: TMemoryStream): Boolean;
end;

function TLoadableFileThread.LoadFromInternet(const AURL: String; AStream: TMemoryStream): Boolean;
var
  HTTPClient: TFPHTTPClient;
begin
  Result := False;
If not InternetConnected then exit;
  HTTPClient := TFPHTTPClient.Create(nil);
  try
    try
      HTTPClient.Get(AURL, AStream);
      AStream.Position := 0;
      Result := True;
    except
      on E: Exception do
        LSIForm.Memo3.Lines.Add('Ошибка загрузки: ' + AURL + ' - ' + E.Message);
    end;
  finally
    HTTPClient.Free;
  end;
end;


constructor TLoadableFileThread.Create(
fnGetNextFileName: PLoadableFunc);
begin
inherited Create(True);
FreeOnTerminate := False;
FFile := EmptyStr;
FNext := fnGetNextFileName;
FName := '';
FList := TList.Create;
FWork := False;

end;

destructor TLoadableFileThread.Destroy;
begin
FreeOnTerminate:=True;
if not Terminated then Terminate;
inherited Destroy;
end;

procedure TLoadableFileThread.GetNext;
begin
FWork := FNext(Self, FFile); // Синхронно вызываем, чтобы получить новое имя,
                             // а если имен нет, тогда и этого же вызова и
                             // забираем дынные...
end;
procedure TLoadableFileThread.Execute;
var
PFile: PLoadableFileRecord;
i: LongInt;
begin
FList := TList.Create;
while not Terminated do begin
  // Synchronize для программы с графическим интерфейсом в главной нити
  //GetNext;
  Synchronize(GetNext);
  if FWork then begin

    New(PFile);
    with PFile^ do begin
      fName := FFile;
      fData := TMemoryStream.Create;
       FromInternet := (Pos('http://', LowerCase(FName)) > 0) or
                         (Pos('https://', LowerCase(FName)) > 0);

                Fail := 3; // 3 попытки прочитать

          if FromInternet then
          begin
            // Загрузка из интернета
            if LoadFromInternet(FName, fData) then
              Fail := -1
            else
              Fail := 0;
          end
          else begin

      Fail :=  3; //3 попытки прочитать, если нужно.
      while Fail > 0 do try
        fData.LoadFromFile(fName);
        Fail := -1;
      except
         dec(Fail);
         sleep(20)
      end;
      end;
      if Fail = 0 then FreeAndNil(fData);// !
    end;
    FList.Add(PFile);
  end else Sleep(iThreadWaitTimeout); // Не важно сколько. Просто ветка должна
                                      // подождать до её завершения т.к.
                                      // файлов больше не будет
end;
for i := 0 to Flist.Count - 1 do with PLoadableFileRecord(FList[i])^ do begin
  FreeAndNil(fData);
  Dispose(PLoadableFileRecord(FList[i]));
end;
FreeAndNil(FList);
FreeOnTerminate := True;
end;


Функция EnumerateList ...
Код: Выделить всё
function EnumerateList(ASender: TObject; out AFileName: String): Boolean;
var
  S: String;
  i,C,Ind: LongInt;
  Thread: TLoadableFileThread;
begin

  Result := False;
  AFileName := '';

  if aFileNames.Count > 0 then
  begin
  EnterCriticalSection(x);
    S := aFileNames.Strings[0];  aFileNames.Delete(0);
// без лишней "популяризации" методов из питона (ИМХО) лучше 

    Result := aFileNames.Count>0;
    AFileName := S;

LeaveCriticalSection(x); // Важно ! (GUI внутри  CriticalSection не работает )

  end else
  begin
    Thread :=  TLoadableFileThread(ASender);
     LSIForm.Memo3.Lines.Add( ' Поток ' + IntToStr(Thread.ThreadID) + ' завершает работу');
     C:=Thread.FList.Count;
     LSIForm.Memo3.Lines.Add(' C '+C.ToString);

    // Обработка загруженных данных
    if   C > 0 then
    begin
     LSIForm.Memo3.Lines.Add('Обработка ' + IntToStr(C) + ' загруженных изображений...');

      // Здесь можно преобразовывать изображения, например:
      for i := 0 to C - 1 do
      with PLoadableFileRecord(Thread.FList[i])^ do begin
      if (Thread.FList[i]<>Nil) and  (  Fail = -1) then

        begin
          // Обработка успешно загруженного изображения
            fData.Position := 0;
                OneDraw(fName, fData); // конверсия  и отрисовка
       end else OneDraw(fName, nil );;
      end
     end;

    Thread.FreeOnTerminate:=True;
    Thread.Terminate;
   end;
end;

Пусковая процедура ...
Код: Выделить всё
procedure StartLoading;
var
  aLoadders: TLoadableFileThread;
  i: LongInt;
begin
  aStartTickCount := GetTickCount64;

i := 1;
Repeat
    aLoadders := TLoadableFileThread.Create( @EnumerateList);
    aLoadders.Start;
Inc(I);
until I>aLoadders.ProcessorCount;
end;


Зы
Работа сетью еще не отлажена но это по сравнению с "более другими проблемами" отладки уже "мелочи жизни" ... :idea:
Alex2013
долгожитель
 
Сообщения: 3140
Зарегистрирован: 03.04.2013 11:59:44

Re: Максимальное разумное количество работающих потоков .

Сообщение xchgeaxeax » 11.05.2025 15:31:01

Код: Выделить всё
constructor TLoadableFileThread.Create(
fnGetNextFileName: PLoadableFunc);
begin
inherited Create(True);
FreeOnTerminate := False;
FFile := EmptyStr;
FNext := fnGetNextFileName;
FName := '';
FList := TList.Create; // Этот конструктор будет выполняться в создающей нити.
{ Я не люблю создавать объекты в одной нити, чтобы потом использовать их в другой.
  Это может вызывать проблемы. Поэтому предпочитаю объекты создавать в начале
  execute, а в конструкторе присваиваю nil. Так объект будет создаваться и
  использоваться в одной и той же нити. }
FWork := False;

end;

...

procedure TLoadableFileThread.Execute;
var
PFile: PLoadableFileRecord;
i: LongInt;
begin
FList := TList.Create; // И вот у вас уже утечка памяти из-за не освобожденного экземпляра объекта TList


Код: Выделить всё
function EnumerateList(ASender: TObject; out AFileName: String): Boolean;
var
  S: String;
  i,C,Ind: LongInt;
  Thread: TLoadableFileThread;
begin

  Result := False;
  AFileName := '';

  if aFileNames.Count > 0 then
  begin
  EnterCriticalSection(x); // При использовании Syncronize это не требуется. Двойная синхронизация может приводить к самоблокировке или цикличной блокировке.
    S := aFileNames.Strings[0];  aFileNames.Delete(0);
// без лишней "популяризации" методов из питона (ИМХО) лучше

    Result := aFileNames.Count>0;
    AFileName := S;

LeaveCriticalSection(x); // Важно ! (GUI внутри  CriticalSection не работает )

  end else
  begin
    Thread :=  TLoadableFileThread(ASender);
     LSIForm.Memo3.Lines.Add( ' Поток ' + IntToStr(Thread.ThreadID) + ' завершает работу');
     C:=Thread.FList.Count;
     LSIForm.Memo3.Lines.Add(' C '+C.ToString);

    // Обработка загруженных данных
    if   C > 0 then
    begin
     LSIForm.Memo3.Lines.Add('Обработка ' + IntToStr(C) + ' загруженных изображений...');

      // Здесь можно преобразовывать изображения, например:
      for i := 0 to C - 1 do
      with PLoadableFileRecord(Thread.FList[i])^ do begin
      if (Thread.FList[i]<>Nil) and  (  Fail = -1) then

        begin
          // Обработка успешно загруженного изображения
            fData.Position := 0;
                OneDraw(fName, fData); // конверсия  и отрисовка
       end else OneDraw(fName, nil );;
      end
     end;

    Thread.FreeOnTerminate:=True;
    Thread.Terminate;
   end;
end;
xchgeaxeax
постоялец
 
Сообщения: 187
Зарегистрирован: 11.05.2023 03:51:40

Re: Максимальное разумное количество работающих потоков .

Сообщение Alex2013 » 12.05.2025 01:32:02

По конструктору FList := TList.Create; моя пактика и опыт подсказывают мне прямо прямо противоположное : все что можно создать один раз желательно создавать один раз тем более если это происходит внутри отдельной "нити-потока". Дело тут не только в том что создание и освобождение объекта/класса требует заметного времени а еще и в том что выделение памяти ВНЕШНЯЯ функция (по сути это потенциально чревато теми же проблемами что обращение к GUI тем более если объект/класс имеет сложный конструктор и/или деструктор )
Но начет утечки верно перед "повторным использованием" нужно например не забыть делать FList.Clear ( это тоже не совсем "чистый прием" но он менее критичен (суть проблемы кроме прочего еще в том что даже в рамках "одной нити-потока" не все что написано последовательно последовательно исполняется если где-то в стеке останется ссылка на "затертый" объект/класс ничего хорошего из этого не выйдет))

Зы
О "критической секции" это (ИМХО) не "блокировка", а просто страховка от "повторного исполнения" участка кода до его завершения (которое может произойти при очень разных ситуациях).
Последний раз редактировалось Alex2013 15.05.2025 13:11:01, всего редактировалось 1 раз.
Alex2013
долгожитель
 
Сообщения: 3140
Зарегистрирован: 03.04.2013 11:59:44

Re: Максимальное разумное количество работающих потоков .

Сообщение xchgeaxeax » 12.05.2025 06:49:11

Alex2013 писал(а):Но начет утечки верно перед "повторным использованием" нужно например не забыть делать FList.Clear

Даже если вы высвободите данные, которых и так нет, то структуры класса, которые создаются и заполняются при Create, это тоже блок динамической памяти. При повторном Create без Free у вас будет утечка памяти именно этой структуры, а не дынных из TList, которых и не было. А в данном случае вы в одной переменной сохраняете в двух разных нитях два разных экземпляра TList не освободив при этом предыдущего.

Функции выделения памяти как раз могут использовать менеджеры, привязываемые к нитям. Поэтому я без особой необходимости и избегаю создания объектов вне текущей нити. А у вас тут вообще выбрасывается старая не освобожденная ссылка на старый экземпляр объекта, созданный в основной нити, при повторной записи в переменную в функции дополнительной нити.

Alex2013 писал(а):О "критической секции" это (ИМХО) не "блокировка", а просто страховка от "повторного исполнения" участка кода до его завершения ( которое может произойти при очень разных ситуациях ).

И что же за такую побочную сверхполезную работу должна выполнять нить, если ей нельзя повторно зайти в секцию? Даже ветвь с очередью сообщений не перейдет к их обработке, если проверка очереди не реализуется в функции ожидания при блокировке. А CriticalSection как раз не занимается проверкой очереди сообщений, но вот Syncronize это делает. Поэтому то, теоретически, код после Synchronize может повторно попытаться исполнить участок с CriticalSection. И вот уже блокировка от нее окончательно уйдет в бесконечное ожидание для нити - она повиснет.

Добавлено спустя 2 часа 43 минуты 31 секунду:
ADD: Как вариант для сортировки списка файлов могу предложить следующие критерии. Сначала надо упорядочить файлы по устройству загрузки, после по объему от большего к меньшему. Таким образом потоки будут выбирать файлы с наименьшей погрешностью по времени загрузки (разница окончания работы потоков не будет сильно отличаться по времени) и каждый поток загрузит примерно одинаковое количество байт с хорошим средним временем. Ну и все же уйти от удаления из списка 0 элемента в пользу переменной с индексом в списке.
xchgeaxeax
постоялец
 
Сообщения: 187
Зарегистрирован: 11.05.2023 03:51:40

Re: Максимальное разумное количество работающих потоков .

Сообщение Alex2013 » 12.05.2025 15:43:16

Все режимы "Четвертичного периода " работают .
Код: Выделить всё
Тест загрузки списка 4
Кэширование ОТКЛ
Однопоточный  режим
Net mode
Время 32,58 c

Тест загрузки списка 4
Кэширование ВКЛ
Однопоточный  режим
Net mode
Время 13,25 c i 29 c 30 
Время 13,26 c

Тест загрузки списка 4
Кэширование ВКЛ
Однопоточный  режим
File mode
Время 2,61 c

Тест загрузки списка 4
Кэширование ВКЛ
MT Mode
Запуск загрузки...
Поток 1048 завершает работу
C 6
Обработка 6 загруженных изображений...
Поток 388 завершает работу
C 9
Обработка 9 загруженных изображений...
Поток 3708 завершает работу
C 7
Обработка 7 загруженных изображений...
Поток 7248 завершает работу
C 7
Обработка 7 загруженных изображений...
Время 5,64 c
Тест загрузки списка 4
Кэширование ОТКЛ
MT Mode
Запуск загрузки...
Поток 6024 завершает работу
C 13
Обработка 13 загруженных изображений...
Поток 6204 завершает работу
C 0
Поток 6908 завершает работу
C 1
Обработка 1 загруженных изображений...
Время 27,30 c


Режим "Без кэширования" это ультимативная "без компромиссная" загрузка из интернета .
"File mode" ультимативное отсутствие интернета (на самом деле просто отладчик процедуры OneDraw )
И все бы ничего но "третья версия" быстрее и стабильнее...

Код: Выделить всё
Тест загрузки списка 3
Кэширование ОТКЛ
Фаза 0 Однопоточный  режим
Время 15,41 c
Фаза 1
Время 24,72 c
Фаза 2
Время 27,33 c
Тест загрузки списка 3
Кэширование ВКЛ
Фаза 0 Однопоточный  режим
Время 11,10 c
Фаза 1
Время 11,41 c
Фаза 2
Время 14,00 c

Тест загрузки списка 3
Кэширование ОТКЛ
Фаза 0 MT Mode
Лог запуска по частям
9 10
18 10
27 10

Время 11,02 c
Фаза 1 MT Mode
Лог
9 10
18 10
27 10

Время 18,47 c
Фаза 2 MT Mode
Лог
9 10
18 9
27 9

Время 20,62 c
Тест загрузки списка 3
Кэширование ВКЛ
Фаза 0 MT Mode
Лог запуска по частям
9 10
18 10
27 10

Время 0,74 c
Фаза 1 MT Mode
Лог
9 10
18 10
27 9

Время 0,91 c
Фаза 2 MT Mode
Лог
9 10
18 9
27 9

Время 3,02 c

В моей версии как обычно, для каждой загрузки выделяется отдельная нить + есть 3 фазы (получение размеров+выделение памяти , чтение в большой линейный массив МемориСтрим, обработка отрисовка )

+Есть дополнительная акселерация при увлечении количества запущенных нитей-потоков ( текущая версия программы держит на 4-ядерном процессоре примерно 90 нитей, но это уже не безопасно но 40-50 работают нормально ничего не тормозит )
Код: Выделить всё
Тест загрузки списка 3
Кэширование ОТКЛ
Фаза 0 MT Mode
Лог запуска по частям

Время 10,94 c
Фаза 1 MT Mode
Лог

Время 14,68 c
Фаза 2 MT Mode
Лог

Время 16,67 c
Тест загрузки списка 3
Кэширование ВКЛ
Фаза 0 MT Mode
Лог запуска по частям

Время 0,47 c
Фаза 1 MT Mode
Лог

Время 0,59 c
Фаза 2 MT Mode
Лог

Время 2,54 c


В общем пока "вдавить педаль в пол" с помощью предложенного кода увы не получилось + есть трудно локализуемые нестабильности.
(Мой опыт говорит что чем меньше времени работает параллельная нить-поток тем меньше шанс словить "нелокальную нестабильность " + потоков может быть заметно большие чем ядер и "аппаратных вычислительных каналов " что удивительно это дает заметный выигрыш в скорости параллельной обработки ... расчет стабильного количества примерно "Число Ядер CPU" *10 )
.

ЗЫ
По моему идеальный метод загрузки подобных мозаичных-галерей был-бы при "ленивой загрузке по требованию" непосредственно при отрисовке видимых элементов с кэшированием и динамическим освобождением от всего лишнего .К сожалению реализация такого простого на первый взгляд способа постоянно оказывается неожиданно громоздкой .Так что приходится проявлять разнообразные "чудеса гибридизации". :idea:

Добавлено спустя 1 час 58 минут 59 секунд:
xchgeaxeax писал(а):и каждый поток загрузит примерно одинаковое количество байт с хорошим средним временем.

Кстати это мысль ! Можно просто грузить одинаковые блоки причем даже если файл меньшего или большого размера
(Просто процедура чтения должна "склеивать" блоки читая их из разных файлов)
Alex2013
долгожитель
 
Сообщения: 3140
Зарегистрирован: 03.04.2013 11:59:44

Re: Максимальное разумное количество работающих потоков .

Сообщение xchgeaxeax » 12.05.2025 17:50:30

Alex2013 писал(а):Кстати это мысль ! Можно просто грузить одинаковые блоки причем даже если файл меньшего или большого размера
(Просто процедура чтения должна "склеивать" блоки читая их из разных файлов)

Только там не просто один объем, а одинаковое время загрузки. Согласитесь, что прочитать 1 Мб c FDD, CD, DVD, HDD, SDD, Flash, Net будет разная скорость. Вот суть моего подхода сделать балансировку выбора файлов. А не подсовывать всем потокам уже готовые списки без даже примерного представления о скорости загрузки каждого из файлов. Ну а время это объем/скорость. Так и балансируется.
xchgeaxeax
постоялец
 
Сообщения: 187
Зарегистрирован: 11.05.2023 03:51:40

Пред.След.

Вернуться в Lazarus

Кто сейчас на конференции

Сейчас этот форум просматривают: нет зарегистрированных пользователей и гости: 222

Рейтинг@Mail.ru
cron