Executar um “select” em paralelo na verdade é bem simples, mas complexo se não deter algumas informações elementares sobre o tratamento que o FireDAC dá ao isolamento de conexão.

Basicamente o processo se dá pelo isolamento da “connection” ao fazer a chamada no banco de dados, talvez aí o maior problema já que é necessário ter uma conexão individual para cada chamada ao banco.

No FireDAC esta disponível na IDE o componente TFDMonitor que é responsável em gerar um novo componente de conexão para cada chamada feita. Este é o caminho mais fácil para utilizar “multithreaded”, assunto que não irei tratar aqui já que o interessante é explorar recursos que entregue mais conteúdo a quem quer entender como o processo acontece.

em construção …. ajude a escrever este texto enviando comentários. Gratidão

Base para o texto:
Unit: Data.FireDAC.Helper
Ver: Exemplo no GIT

Congelando a janela com TTask.WaitForAll ???

#compartilhandoconhecimento #wba10anos
Depois que publiquei o vídeo Papo sobre POO (TTask e outros), recebi um comentário que me deixou intrigado.

Porquê o a janela principal trava quando executo   TTask.WaitForAll(  ….  );

Fui dar uma olhando como foi implementado o método – observei que é feito uma chamada para uma camada de TEvent que é implementado nas chamadas internas da rotina. Por traz da mecânica com TEvent é feito uso de  WaitForSingleObject – que é uma camada de acesso a biblioteca do windows.

A alteração não é trivial. O primeiro problema é como reescrever o método considerando que o array passado como parametro é um   .. AArray: array of ITask… qualquer deslize no seu uso vai provocar um incremento no contador RefCount da interface e pode levar a perda de controle no autofree do processo…
Para não causar um incremento do RefCount é preciso fazer uso da instrução  [unsafe] o que foi feito através de um “wrapper” para um record marcado para não incrementar o RefCount.

Contornado a questão de referência, o próximo obstáculo é encontrar um mecanismo que permita parar o processamento sem congelar a janela…

Depois de várias tentativas a solução encontrada foi o “infamous” application.processmessage. Esta não é uma boa opção, já que  mantém o processador em atividade,  quando o ideal seria encontrar um modelo que não fizesse uso do processador enquanto esta atualizando a janela principal.

Primeiramente foi criado um Class Helper para o TTask:

 

[code lang=”pascal”]

Type
TTaskHelper = class helper for TTask
private type
TUnsafeTaskEx = record
private
[Unsafe]
// preciso de um record UNSAFE para nao incrementar o RefCount da Interface
FTask: TTask;
public
property Value: TTask read FTask write FTask;
end;
public
class function WaitForAllEx(AArray: Array of ITask;
ATimeOut: int64 = INFINITE): boolean;
end;

[/code]

Versão 1. Implementando o método:

[code lang=”pascal”]
class function TTaskHelper.WaitForAllEx(AArray: array of ITask;
ATimeOut: int64 = INFINITE): boolean;
var
task: TUnsafeTaskEx;
i: integer;
taskInter: TArray<TUnsafeTaskEx>;
completou: boolean;
Canceled, Exceptions: boolean;
begin
Canceled := false;
Exceptions := false;
result := true;
try
for i := low(AArray) to High(AArray) do
begin
task.Value := TTask(AArray[i]);
if task.Value = nil then
raise EArgumentNilException.Create(‘Wait Nil Task’);

completou := task.Value.IsComplete;
if not completou then
begin
taskInter := taskInter + [task];
end
else
begin
if task.Value.HasExceptions then
Exceptions := true
else if task.Value.IsCanceled then
Canceled := true;
end;
end;

try
for task in taskInter do
begin
while not task.Value.IsComplete do
begin
try
TThread.Queue(nil,
procedure
begin
application.ProcessMessages;
end);
finally
end;
end;
if task.Value.IsComplete then
begin
if task.Value.HasExceptions then
Exceptions := true
else if task.Value.IsCanceled then
Canceled := true;
end;
end;
finally
end;
except
result := false;
end;

if (not Exceptions and not Canceled) then
Exit;
if Exceptions or Canceled then
raise EOperationCancelled.Create
(‘One Or More Tasks HasExceptions/Canceled’);

end;

[/code]

Versão 2. Revisando o código para um uso mais eficiente com MsgWaitForMultipleObjectsEx:
[code lang=”pascal”]
class function TTaskHelper.WaitForAllEx(AArray: array of ITask;
ATimeOut: int64 = INFINITE): boolean;
var
FEvent: TEvent;
task: TUnsafeTaskEx;
i: integer;
taskInter: TArray<TUnsafeTaskEx>;
completou: boolean;
Canceled, Exceptions: boolean;
ProcCompleted: TProc<ITask>;
LHandle: THandle;
LStop: TStopwatch;
begin
LStop := TStopwatch.StartNew;
ProcCompleted := procedure(ATask: ITask)
begin
FEvent.SetEvent;
end;

Canceled := false;
Exceptions := false;
result := true;
try
for i := low(AArray) to High(AArray) do
begin
task.Value := TTask(AArray[i]);
if task.Value = nil then
raise EArgumentNilException.Create(‘Wait Nil Task’);

completou := task.Value.IsComplete;
if not completou then
begin
taskInter := taskInter + [task];
end
else
begin
if task.Value.HasExceptions then
Exceptions := true
else if task.Value.IsCanceled then
Canceled := true;
end;
end;

try
FEvent := TEvent.Create();
for task in taskInter do
begin
try
FEvent.ResetEvent;
if LStop.ElapsedMilliseconds > ATimeOut then
break;
LHandle := FEvent.Handle;
task.Value.AddCompleteEvent(ProcCompleted);
while not task.Value.IsComplete do
begin
try
if LStop.ElapsedMilliseconds > ATimeOut then
break;
if MsgWaitForMultipleObjectsEx(1, LHandle,
ATimeOut – LStop.ElapsedMilliseconds, QS_ALLINPUT, 0)
= WAIT_OBJECT_0 + 1 then
application.ProcessMessages;
finally
end;
end;
if task.Value.IsComplete then
begin
if task.Value.HasExceptions then
Exceptions := true
else if task.Value.IsCanceled then
Canceled := true;
end;
finally
task.Value.removeCompleteEvent(ProcCompleted);

end;
end;
finally
FEvent.Free;
end;
except
result := false;
end;

if (not Exceptions and not Canceled) then
Exit;
if Exceptions or Canceled then
raise EOperationCancelled.Create
(‘One Or More Tasks HasExceptions/Canceled’);

end;

[/code]

Reescrevendo o Exemplo:  Dia11_Threading_TParallel

 

Este é um comportamento quando o SO é windows. Em outras plataformas o resultado poderá ser outro.

 

 

[usa LogEvents]
Tenho uma quantidade de produtos relativamente grande que requer processamento de custos de produção envolvendo custo de matérias primas, mão-de-obra e outros custos vinculado a célula de produção.

A modelagem prevê que uma ficha de produção pode conter outras fichas formando uma lista de dependências dos processos o que gera processamento recursivo de dependências.

Como se pode imaginar, não é um processamento sequenciado tão simples e pode ser demorado em face a profundidade da arvore de dependência que um produto pode exigir.

Então repensando os processos, o desafio passou exigir processamento em paralelo das fichas de tal forma que fosse possível processar uma quantidade de produtos ao mesmo tempo e aproveitando melhor os recursos da máquina;

Neste cenário, saber qual o estágio de processamento de cada ficha e o onde se encontra o cálculo passou a ser requisito de interação com usuário;

Para executar vamos utilizar da biblioteca de processamento em paralelo do Delphi (introduzido no XE7, no exemplo usamos Berlin).

Passos:

  • Isolar as conexões de banco de dados para trata-las individualmente por Task;
  • Criar infraestrutura de comunicação entre o processamento e feedback com usuário;
  • Tratar a sincronização de informações geradas pelas várias TTasks em andamento informando a janela de progresso do usuário;
imagem_janela
Tendo em mente que o controle possa ser utilizado em outras aplicações, o uso de um procedimento ANONIMOUS me parece bastante resistente a diversidade de códigos a que poderá vir a ser utilizado.
Veja como ficou o exemplo de execução:

[code lang=”pascal”]

procedure TForm8.Button1Click(Sender: TObject);
var
LProgr: IProgressEvents;
i: integer;
begin
// inicializa a janela de progresso
LProgr := TProgressEvents.new;
LProgr.max := 100; // opcional: marca o número máximo itens
LProgr.MaxThreads := SpinEdit1.Value ; // indica o número máximo de threads em paralelo
LProgr.CanCancel := true; :// marca se pode cancelar a operação

for i := 1 to 100 do
begin // loop de demonstração – simulando uma lista de processos
LProgr.Text := ‘Produto: ‘ + intToStr(i); // texto livre

// onde as coisas acontecem…..
// adiciona o processo a ser executado e aponta o método anonimous as ser executado pela TTask
LProgr.add(i, ‘Produto: ‘ + intToStr(i), // processo a executar
procedure(x: integer)
var
n: integer;
msg: string;
begin
msg := ‘Produto: ‘ + intToStr(x); // processo em execução
LogEvents.DoProgress(self, 0, etStarting, msg); // notifica que o processo foi iniciado
n := Random(10000);

sleep(n);
LogEvents.DoProgress(self, 0, etWorking, msg); // notifica que esta em execução
// executa o código de calculo … aqui…
n := Random(10000);
if LProgr.Terminated then exit; // checa se o usuario cancelou a operação
sleep(n);
end);
if LProgr.Terminated then
break;
end;
LogEvents.DoProgress(self, 0, etAllFinished, ”); // sinaliza que todas os processo foram completados.
end;

[/code]

Código fonte com o Exemplo e classes que implementam a janela de monitoramento do progresso de cada thread.

Quando estamos rodando um código em um processo paralelo e internamente a Thread encontra pela frente uma EXCEPTION nada é apresentado para o usuário. Isto ocorre porque a Thread não tem como notificar a Thread Principal (do app) para mostrar a exceção ao usuário. Com isto não há um expediente para mostrar a exceção na thread principal.  escreve sobre o tema em seu blog Rob’s Technology Corner.

Robert propõe a rotina que gera erro para contextuar o problema:

[code lang=”pascal”]

procedure TForm5.Button1Click(Sender: TObject);
begin
Button1.Enabled := False;
SlowProc;
end;

procedure TForm5.FormDestroy(Sender: TObject);
begin
Task.Cancel;
end;

procedure TForm5.SlowProc;
begin
Task := TTask.Create( procedure
var
I : Integer;
begin
for I := 0 to 9 do
begin
if TTask.CurrentTask.Status = TTaskStatus.Canceled then
exit;
Sleep(1000);
if I = 2 then
raise EProgrammerNotFound.Create(‘Something bad just happened’);
end;
if TTask.CurrentTask.Status <> TTaskStatus.Canceled then
begin
TThread.Queue(TThread.CurrentThread,
procedure
begin
if Assigned(ListBox1) then
begin
Listbox1.Items.Add(’10 Seconds’);
Button1.Enabled := True;
end;
end);
end;
end);
Task.Start;
end;
[/code]

Executando o código é possível constatar que o procedimento levanta uma exceção e o usuário não recebe a informação de erro.

Stefan Glienke observando o que escreve Robert, propõe uma alteração em TTask para permitir tratar as exceções transparentes para o usuário e mais simples na implementação.
Glienke empresta de .NET uma implementação de Task.ContinueWith que permite continuar a execução após a ocorrência da exceção, veja como ficou.

[code lang=”pascal”]
unit ThreadingEx;

interface

uses
SysUtils,
Threading;

type
TAction<T> = reference to procedure(const arg: T);

TTaskContinuationOptions = (
NotOnCompleted,
NotOnFaulted,
NotOnCanceled,
OnlyOnCompleted,
OnlyOnFaulted,
OnlyOnCanceled
);

ITaskEx = interface(ITask)
[‘{3AE1A614-27AA-4B5A-BC50-42483650E20D}’]
function GetExceptObj: Exception;
function GetStatus: TTaskStatus;
function ContinueWith(const continuationAction: TAction<ITaskEx>;
continuationOptions: TTaskContinuationOptions): ITaskEx;

property ExceptObj: Exception read GetExceptObj;
property Status: TTaskStatus read GetStatus;
end;

TTaskEx = class(TTask, ITaskEx)
private
fExceptObj: Exception;
function GetExceptObj: Exception;
protected
function ContinueWith(const continuationAction: TAction<ITaskEx>;
continuationOptions: TTaskContinuationOptions): ITaskEx;
public
destructor Destroy; override;

class function Run(const action: TProc): ITaskEx; static;
end;

implementation

uses
Classes;

{ TTaskEx }

function TTaskEx.ContinueWith(const continuationAction: TAction<ITaskEx>;
continuationOptions: TTaskContinuationOptions): ITaskEx;
begin
Result := TTaskEx.Run(
procedure
var
task: ITaskEx;
doContinue: Boolean;
begin
task := Self;
if not IsComplete then
DoneEvent.WaitFor;
fExceptObj := GetExceptionObject;
case continuationOptions of
NotOnCompleted: doContinue := GetStatus <> TTaskStatus.Completed;
NotOnFaulted: doContinue := GetStatus <> TTaskStatus.Exception;
NotOnCanceled: doContinue := GetStatus <> TTaskStatus.Canceled;
OnlyOnCompleted: doContinue := GetStatus = TTaskStatus.Completed;
OnlyOnFaulted: doContinue := GetStatus = TTaskStatus.Exception;
OnlyOnCanceled: doContinue := GetStatus = TTaskStatus.Canceled;
else
doContinue := False;
end;
if doContinue then
continuationAction(task);
end);
end;

destructor TTaskEx.Destroy;
begin
fExceptObj.Free;
inherited;
end;

function TTaskEx.GetExceptObj: Exception;
begin
Result := fExceptObj;
end;

class function TTaskEx.Run(const action: TProc): ITaskEx;
var
task: TTaskEx;
begin
task := TTaskEx.Create(nil, TNotifyEvent(nil), action, TThreadPool.Default, nil);
Result := task.Start as ITaskEx;
end;

end.
[/code]

Como usar a nova implementação de TTask…
[code lang=”pascal”]
TTaskEx.Run(
procedure
begin
Sleep(2000);
raise EProgrammerNotFound.Create(‘whoops’)
end)
.ContinueWith(
procedure(const t: ITaskEx)
begin
TThread.Queue(nil,
procedure
begin
ShowMessage(t.ExceptObj.Message);
end);
end, OnlyOnFaulted);
[/code]
 

Ver PPL – TTask an example in how not to use

Executar uma query em segundo plano (em paralelo) não …é difícil de fazer, o seu controle é que
pode ser mais complexo.
Para executar em segundo plano basta:

[code lang=”pascal”]
TThread.CreateAnonymousThread(procedure
begin
ALQuery1.sql.Text := ‘….’;
ALQuery1.Open;
end).Start;
[/code]

ou

[code lang=”pascal”]
TThread.CreateAnonymousThread(
procedure
var i:integer;
begin
for I := 0 to 10 do
begin
// faz alguma coisa…
AlQuery1.execSQL;
end;
end).Start;
[/code]

Um pensamento simplista é você criar uma conexão para cada QUERY em separado. Se você tem
um CONNECTION isolado para UMA QUERY, então é possível executá-la em paralelo dentro de
uma nova Thread;

Algumas idéias onde pode utilizar o processo em paralelo:
– quando precisa registrar um log em uma tabela;
– se for possível adiantar um SELECT que será utilizado mais a frente;
– se precisa rodar um loop que não tem dependência com os próximos passos da sequencia do
código;
– quando precisa fazer um somatório de dados na tabela para mostrar o seu valor na janela…. e
liberar o usuário para continuar fazendo outras coisas.
As vezes você pode por um SELECT em paralelo e disparar outra sequencia de código… e mais
adiante aguardar o primeiro SELECT concluir… para depois então continuar…. Este é assunto para
outro POST para tratar de TTASK.