Как правильно передать строку в работающий поток?

Общие вопросы программирования, алгоритмы и т.п.

Модератор: Модераторы

Как правильно передать строку в работающий поток?

Сообщение VirtUX » 18.06.2021 17:00:54

Есть поток с "бесконечным" циклом. На каждой итерации цикла, помимо основных действий, еще нужно проверять строку на наличие данных (то есть: fDataString.Length > 0) и обрабатывать эти данные, - которые должны попадать из главного потока.
Вопрос: обязательно использовать критические секции? Если - да, то как лучше?

Как я себе представляю это:

Код: Выделить всё
TMyThread = class(TThread)
  private
    fCS: TCriticalSection;
    fDataString: string;
  protected
    procedure Execute; override;
  public
    procedure SetDataString(S: string);
    constructor Create;
    destructor Destroy; override;
  end;

{...}

procedure TMyThread.Execute;
begin
  while not Terminated do begin
    {... any other do ...}
    fCS.Enter; // Блок обработки внешних данных
      if (fDataString.Length > 0) then begin
        {... any do ...};
        fDataString:= '';
      end;
    fCS.Leave;
    {... any other do ...}
    sleep(1);
  end;
end;

procedure TMyThread.SetDataString(S: string);
begin
  fCS.Enter;
  fDataString:= S;
  fCS.Leave;
end;

constructor TMyThread.Create;
begin
  fCS:= TCriticalSection.Create;
  inherited Create(false);
end;

destructor TMyThread.Destroy;
begin
  FreeAndNil(fCS);
  inherited Destroy;
end;


Меня тревожит момент (если не использовать критические секции): пока будет выполняться "Блок обработки внешних данных", то главный поток может успеть изменить fDataString вызовом SetDataString(S: string). В принципе, можно сделать:

Код: Выделить всё
procedure TMyThread.Execute;
var
  _s: string;
begin
  while not Terminated do begin
    {... any other do ...}
    // BEGIN - Блок обработки внешних данных
    _s:= fDataString;
    fDataString:= '';
    if (_s.Length > 0) then begin
       {... any do ...};
    end;
    // END - Блок обработки внешних данных
    {... any other do ...}
    sleep(1);
  end;
end;


Но можно-ли так, и лучше-ли такой вариант?

Может есть другие способы?
Аватара пользователя
VirtUX
энтузиаст
 
Сообщения: 880
Зарегистрирован: 05.02.2008 10:52:19
Откуда: Крым, Алушта

Re: Как правильно передать строку в работающий поток?

Сообщение wavebvg » 18.06.2021 17:37:40

Можно читать/обновлять данные из нескольких потоков через отправку сообщений.
wavebvg
постоялец
 
Сообщения: 354
Зарегистрирован: 28.02.2008 04:57:35

Re: Как правильно передать строку в работающий поток?

Сообщение Дож » 18.06.2021 17:40:07

Использовать средство синхронизации обязательно. Второй вариант кода (без критических секций) настолько неправильный и недопустимый, что я не знаю что комментировать в нём в первую очередь. Наверное, прежде всего имеет смысл изучить какие проблемы и баги могут возникнуть в мультипоточных приложения, что такое атомарные и неатомарные операторы, зачем нужны средства синхронизации в целом и TCriticalSection в частности.

Основная проблема второго фрагмента кода заключается в том, что оператор копирования строки не атомарный. Он делает по крайней мере два шага: скопировать указатель на строку, и увеличить счётчик ссылок. Помимо этого, возможно, он делает что-то из: уменьшить счётчик предыдущей строки, освободить память старой строки и сдать её блок памяти обратно в менеджер памяти. Если два потока используют одну и ту же строку одновременно (например, _s:= fDataString и fDataString:= S) результат непредсказуем вплоть до порчи памяти и крашей.
Аватара пользователя
Дож
энтузиаст
 
Сообщения: 899
Зарегистрирован: 12.10.2008 16:14:47

Re: Как правильно передать строку в работающий поток?

Сообщение VirtUX » 18.06.2021 18:00:21

Дож писал(а):Основная проблема второго фрагмента кода заключается в том, что оператор копирования строки не атомарный.


Понял. Спасибо!
Аватара пользователя
VirtUX
энтузиаст
 
Сообщения: 880
Зарегистрирован: 05.02.2008 10:52:19
Откуда: Крым, Алушта

Re: Как правильно передать строку в работающий поток?

Сообщение runewalsh » 18.06.2021 18:06:57

Общее правило: всё верно, при работе с расшаренными данными СОХРАНИ ИХ СНАЧАЛА, ЧЁРТ ПОБЕРИ, В ЛОКАЛЬНУЮ ПЕРЕМЕННУЮ. (Обычно после такого сохранения можно и блокировку отпустить.)
Проще всего с критическими секциями. Но можно сделать и без них:

Вариант 1, сложный. Через Interlocked*.
Функция InterlockedCompareExchange(var target: pointer; value: pointer; comparand: pointer): pointer (https://ru.wikipedia.org/wiki/Сравнение_с_обменом) выполняет атомарный эквивалент
Код: Выделить всё
result := target;
if target = comparand then target := value;

Функция InterlockedExchange(var target: pointer; value: pointer): pointer выполняет атомарный эквивалент
Код: Выделить всё
result := target;
target := value;

Их можно использовать в сочетании с тем фактом, что string хранится как указатель. Но нужно будет вручную проследить за счётчиками ссылок, т. к. функции работают с сырыми указателями и компилятор не вставляет магию с fpc_addref / fpc_release, вставляемую при обычных операциях, когда
Код: Выделить всё
var a, b: string;
a := b;
расписывается в
Код: Выделить всё
Finalize(a);
fpc_addref(b);
pointer(a) := pointer(b);


Код: Выделить всё
type
   TMyThread = class(TThread)
   private
      fDataString: string;
   protected
      procedure Execute; override;
   public
      procedure SetDataString(const s: string);
      function FetchDataString: string;
   end;

   function fpc_addref(data, typeInfo: pointer): SizeInt; [external name 'FPC_ADDREF'];

   procedure TMyThread.Execute;
   var
      ds: string;
   begin
      while not Terminated do
      begin
         // ...
         // Блок обработки внешних данных
         ds := FetchDataString;
         if ds.Length > 0 then
         begin
            // ...
         end;
         // ...
         Sleep(1);
      end;
   end;

   procedure TMyThread.SetDataString(const s: string);
   begin
      if InterlockedCompareExchange(pointer(fDataString), pointer(s), nil) = nil then
         // Если старое значение равно nil — сырой указатель s скопирован в fDataString, нужно аддрефнуть.
         fpc_addref(@s, TypeInfo(s))
      else
         // Иначе InterlockedCompareExchange ничего не сделала. Это если повторные SetDataString нужно игнорировать.
         ;
   end;

   function TMyThread.FetchDataString: string;
   begin
      Finalize(result); // result не обязан быть пустым, а нам нужен пустой, т. к. в следующей строчке он буквально затирается.
      pointer(result) := InterlockedExchange(pointer(fDataString), nil);
      // ничего AddRef'ать не нужно, fDataString занулена.
   end;
(Можно извернуться и сделать без fpc_addref, чтобы это не было FPC-only, но это будет выглядеть ещё более магически.)
В этом варианте FetchDataString «крадёт» fDataString, зануляя старое значение.

Вариант 2. Просто забить.
Вообще-то присваивание примитивных типов, в том числе указателей (при некоторых оговорках, типа соблюдения выравнивания) атомарно. То есть если у тебя в коде указатель 11111111 меняется на 22222222, то другой поток и без синхронизации вскоре увидит эту же смену 11111111 на 22222222 — он никогда не прочитает 11222222, или мусор, или снова внезапное 11111111 после того, как уже увидел 22222222, и т. д.

В случае со строками, хотя к ним добавляется магия компилятора, она также потокобезопасна (и работа со счётчиками ссылок, и GetMem/FreeMem), поэтому товарища выше не слушай — всё будет работать. Нужно просто сохранить строку в локальную переменную, чтобы она не изменилась у нас под носом.
Код: Выделить всё
type
   TMyThread = class(TThread)
   private
      fDataString: string;
   protected
      procedure Execute; override;
   public
      procedure SetDataString(const s: string);
   end;

   procedure TMyThread.Execute;
   var
      ds: string;
   begin
      while not Terminated do
      begin
         // ...
         // Блок обработки внешних данных
         ds := fDataString;
         if ds.Length > 0 then
         begin
            // ...
         end;
         // ...
         Sleep(1);
      end;
   end;

   procedure TMyThread.SetDataString(const s: string);
   begin
      fDataString := s;
   end;

Если вместе со строкой должно обязательно устанавливаться что-то ещё — флаг «somethingChanged» какой-нибудь, или 2 таких строки должны установиться одновременно, то без критической секции всё же не обойтись. Но если у тебя реально 1 строка, или даже не одна, но гонки не критичны (неодновременная установка 2 строк ничего не сломает) — можно и так.

P. S. Ещё одно правило: по возможности не используй Sleep и уж точно НИКОГДА не используй Sleep(1), и даже Sleep(100). Используй события (RTLEventCreate или SyncObjs.TEventObject) и ожидание событий, опять же, по возможности без таймаута (timeout=INFINITE).
Последний раз редактировалось runewalsh 19.06.2021 11:20:24, всего редактировалось 1 раз.
Аватара пользователя
runewalsh
энтузиаст
 
Сообщения: 578
Зарегистрирован: 27.04.2010 00:15:25

Re: Как правильно передать строку в работающий поток?

Сообщение wavebvg » 18.06.2021 18:26:38

runewalsh писал(а):Просто забить.

Для типов, где осуществляются неявные преобразования (подсчет ссылок для строк) и их значения не берутся из констант -- совет очень неоднозначный.
В обычном коде, да, вероятность ничтожно мала, но раз в год и незаряженное ружьё стреляет. Но если Вы в несколько потоков читаете/пишете данные, используя параллельную и конкуретную обработку одной коллекции, то костей потом уже не соберёте.
wavebvg
постоялец
 
Сообщения: 354
Зарегистрирован: 28.02.2008 04:57:35

Re: Как правильно передать строку в работающий поток?

Сообщение runewalsh » 18.06.2021 18:52:25

wavebvg
Она не ничтожно мала, она нулевая, не пугай людей. Строки специально сделаны так, чтобы код
Код: Выделить всё
// var a, b: string;
a := b;

работал без блокировок, потому что из-за copy-on-write возможна ситуация, когда 2 потока работают с одной строкой и не знают об этом. Посмотри пример безблокировочного стека в той статье в вики: тебе даны достаточные гарантии, чтобы вместо
Код: Выделить всё
EnterCriticalSection;
A := A + B;
LeaveCriticalSection;

можно было делать
Код: Выделить всё
repeat
  localA := A;
  localB := B;
until InterlockedCompareExchange(A, localA + localB, localA) = localA;

где чтения A в localA и B в localB выполняются без каких-либо блокировок и это будет работать несмотря на то, что прямо сейчас другой поток может выполнять A := newA.

С поэлементно расшаренными коллекциями это не сработает, но про них никто и не говорил. Множество задач подразумевают просто необходимость передать пачку данных (пусть даже коллекцию — но всю) между двумя конкретными потоками, а любой объект можно выделить динамически, сведя эту передачу к передаче указателя.

Опять же, (высоко)конкурентная работа с коллекциями под одной критической секцией — очень такая себе идея. Если тебе нужно обработать коллекцию из 1000 объектов в 10 потоков, ты нумеруешь потоки от 0 до 9, выдаёшь 0-му объекты 0–99, 1-му — 100–199, и т. д., ждёшь через TThread/TEventObject.WaitFor и упс, блокировки снова не нужны.
Аватара пользователя
runewalsh
энтузиаст
 
Сообщения: 578
Зарегистрирован: 27.04.2010 00:15:25

Re: Как правильно передать строку в работающий поток?

Сообщение Дож » 18.06.2021 19:18:34

runewalsh писал(а):wavebvg
Она не ничтожно мала, она нулевая, не пугай людей. Строки специально сделаны так, чтобы код
Код: Выделить всё
// var a, b: string;
a := b;

работал без блокировок

Ссылку на соотвествующее утверждение в документации к FPC в студию.
Аватара пользователя
Дож
энтузиаст
 
Сообщения: 899
Зарегистрирован: 12.10.2008 16:14:47

Re: Как правильно передать строку в работающий поток?

Сообщение runewalsh » 18.06.2021 20:35:05

Free Pascal занимается, грубо говоря, три калеки, чтобы, как в C++, документировать модель памяти и прочее, вот что я писал про атомарные присваивания примитивных типов. Но это единственно возможное следствие из логических рассуждений.

Представь, что ты создал строку 'я строка', адрес которой в памяти — 123, передал её в модуль X, который сохранил её себе и потом на запрос другим потоком вернул её же. Модулю X никто не говорил, что перед работой с этой строкой нужно захватить такую-то блокировку, поэтому если бы присваивания строк были потоконебезопасными, всё бы заведомо навернулось. К тому же потоконебезопасным строкам не нужны были бы атомарные операции над счётчиком ссылок. (К TInterfacedObject и присваиванию интерфейсных ссылок всё то же самое относится, кстати.)
Аватара пользователя
runewalsh
энтузиаст
 
Сообщения: 578
Зарегистрирован: 27.04.2010 00:15:25

Re: Как правильно передать строку в работающий поток?

Сообщение Дож » 18.06.2021 20:50:16

Ок, давайте поговорим предметно. Беру программу:

Код: Выделить всё
{$ifdef unix}uses cthreads;{$endif}

var
  A, B, C: AnsiString;

function CPU1(P: Pointer): PtrInt;
begin
  A := B;
  Exit(0);
end;

function CPU2(P: Pointer): PtrInt;
begin
  B := C;
  Exit(0);
end;

begin
  A := 'a'; UniqueString(A);
  B := 'b'; UniqueString(B);
  C := 'c'; UniqueString(C);
  BeginThread(@CPU1);
  BeginThread(@CPU2);
  Writeln(A);
  Writeln(B);
  Writeln(C);
end.


Вы утверждаете, что операторы присваивания внутри CPU1 и CPU2 атомарны.

Компилирую fpc -al a.pas, читаю ассемблерный код:
Код: Выделить всё
# [8] A := B;
   movl   U_$P$PROGRAM_$$_B,%edx
   movl   $U_$P$PROGRAM_$$_A,%eax
   call   fpc_ansistr_assign

...

# [14] B := C;
   movl   U_$P$PROGRAM_$$_C,%edx
   movl   $U_$P$PROGRAM_$$_B,%eax
   call   fpc_ansistr_assign


Эти два блока данных выполняются параллельно. Одна из возможных последовательностей выполнения этих инструкций следующая:
Код: Выделить всё
movl U_$P$PROGRAM_$$_B,%edx   // CPU1    A := B

movl U_$P$PROGRAM_$$_C,%edx   // CPU2    B := C
movl $U_$P$PROGRAM_$$_B,%eax  // CPU2    B := C
call fpc_ansistr_assign       // CPU2    B := C

movl $U_$P$PROGRAM_$$_A,%eax  // CPU1    A := B
call fpc_ansistr_assign       // CPU1    A := B


Я утверждаю, что при такой последовательности выполнения CPU2 уничтожит строку B и освободит её из памяти, после чего CPU1 будет к ней обращаться, потому что держит указатель в edx. Это может привести к крашу или некорректной работе.

Где я не прав?
Аватара пользователя
Дож
энтузиаст
 
Сообщения: 899
Зарегистрирован: 12.10.2008 16:14:47

Re: Как правильно передать строку в работающий поток?

Сообщение runewalsh » 18.06.2021 21:27:15

Блин, хех, мда. Ты прав.
-НО- — так, падажжи, для своего-то примера ты прав, но я именно имея в виду нечто подобное подчеркнул, что нужно ЧИТАТЬ ВСЁ В ЛОКАЛЬНЫЕ ПЕРЕМЕННЫЕ перед обработкой. Как раз чтобы гарантировать, что ссылка на обрабатываемый объект никуда не улетит.
Твои процедуры должны быть переделаны следующим образом:
Код: Выделить всё
function CPU1(P: Pointer): PtrInt;
var
   t: string;
begin
   t := B;
   A := t;
   Exit(0);
end;

function CPU2(P: Pointer): PtrInt;
var
   t: string;
begin
   t := C;
   B := t;
   Exit(0);
end;

Вообще-то это багофича компилятора и в идеале твой код должен бы работать, но если компилятор будет всё автоматически через такие локальные t расписывать, строки будут тормозить ещё сильнее (они и так это делают безбожно — по слухам, их даже джаваскриптовые уделают, порой в несколько раз). Сравни с тем, как const- (да и var-) параметры не трогают счётчики ссылок (на этот раз с явным упоминанием в документации), что тоже может приводить к проблемам в крайних случаях.
Последний раз редактировалось runewalsh 18.06.2021 22:17:58, всего редактировалось 2 раз(а).
Аватара пользователя
runewalsh
энтузиаст
 
Сообщения: 578
Зарегистрирован: 27.04.2010 00:15:25

Re: Как правильно передать строку в работающий поток?

Сообщение Дож » 18.06.2021 21:55:32

нужно ЧИТАТЬ ВСЁ В ЛОКАЛЬНЫЕ ПЕРЕМЕННЫЕ перед обработкой

А как читать в локальную переменную -- обычным оператором присваивания строк, который не атомарный, или как-то по-особому? В вашем коде который про "забить" и в коде топикстартера я отчётливо вижу A:=B и B:=C в двух параллельных потоках.

Твои процедуры должны быть переделаны следующим образом:

В этом варианте t:=B и B:=t могут выполнится параллельно и иметь те же проблемы, что я описывал ранее. (Если CPU1 сохранит указатель на B в регистр, а CPU2 уничтожит B.)

Сравни с тем, как const- (да и var-) параметры не трогают счётчики ссылок (на этот раз с явным упоминанием в документации), что тоже может приводить к проблемам в крайних случаях.

const- может так работать, потому что гарантируется условие, что на строку ссылается вызывающий код всё время работы вызванного кода. var параметр так работать не может.
Аватара пользователя
Дож
энтузиаст
 
Сообщения: 899
Зарегистрирован: 12.10.2008 16:14:47

Re: Как правильно передать строку в работающий поток?

Сообщение runewalsh » 18.06.2021 22:39:37

Э, чёрт. Да. Ты всё-таки прав.
Короче, без синхронизации работать и правда нельзя; атомарные счётчики строк — они как раз для случая, когда два потока не знают, что работают с одной строкой, но не дают права долбиться в одну и ту же переменную, К СОЖАЛЕНИЮ. Либо критические секции, либо мой вариант с интерлокедами. Но он, наверное, обречён работать с семантикой «кражи» значения, то есть его нельзя адаптировать под раздачу fDataString нескольким потокам.

КАРОЧ, ФИНАЛЬНЫЙ ВАРИАНТ:
Код: Выделить всё
type
   TMyThread = class(TThread)
   private
      fDataString: string;
      fDataStringLock: TCriticalSection;
   protected
      procedure Execute; override;
   public
      procedure SetDataString(const s: string);
   end;

   procedure TMyThread.Execute;
   var
      ds: string;
   begin
      while not Terminated do
      begin
         // ...
         // Блок обработки внешних данных
         fDataStringLock.Enter;
         ds := fDataString;
         fDataStringLock.Leave;
         if ds.Length > 0 then
         begin
            // ...
         end;
         // ...
         Sleep(1);
      end;
   end;

   procedure TMyThread.SetDataString(const s: string);
   begin
      fDataStringLock.Enter;
      fDataString := s;
      fDataStringLock.Leave;
   end;
(Можно даже, аналогично SetDataString, вынести чтение под блокировкой в GetDataString.)

Добавлено спустя 5 минут 3 секунды:
1.png
Унижен, мда.
Аватара пользователя
runewalsh
энтузиаст
 
Сообщения: 578
Зарегистрирован: 27.04.2010 00:15:25

Re: Как правильно передать строку в работающий поток?

Сообщение iskander » 19.06.2021 05:58:02

Кажется, нужна ещё какая-нибудь очередь, иначе строки могут теряться.
iskander
энтузиаст
 
Сообщения: 590
Зарегистрирован: 08.01.2012 18:43:34

Re: Как правильно передать строку в работающий поток?

Сообщение Alex2013 » 20.06.2021 11:33:13

Есть еще одни вариант ... Можно сделать поток без "бесконечного цикла" .
Работает так: готовишь данные и кидаешь их во внутренние переменные потока .
Запускаешь поток и по таймеру поверяешь его завершение и как только он завершится сразу запускаешь с новыми данными.
"Асинхронность" чуть ниже чем в случае "бесконечного цикла" но для большинства возможных применений ее хватает .
Но достоинства подобного способа использования потоков очевидны !
1 Поток работает только с внутренними данным
2 Если поток повис, можно остановить его по таймауту.
3 Никаких проблем с согласованием (поток работает в "режиме черного ящика" и все синхронизации зависят только от кода находящегося вне потока )
4 Если нужно запускать целый "пул потоков" это тоже сделать значительно проще.
5 В принципе таймер можно заменить "вспомогательными потоком" но это уже чуть портит идею "режима черного ящика" .
( "вспомогательный поток" должен "подбирать" внешние данные и скидывать результат что нарушает полную изоляцию потока)
хотя то что он является только "посредником"(т.е. не использует данные самостоятельно ) чуть облегчает ситуацию.
Alex2013
долгожитель
 
Сообщения: 2926
Зарегистрирован: 03.04.2013 11:59:44


Вернуться в Общее

Кто сейчас на конференции

Сейчас этот форум просматривают: нет зарегистрированных пользователей и гости: 7

Рейтинг@Mail.ru