Final Message Preparation for Parallel Enabled Processes
This snippet should be used to conclude execution of parallel enabled processes. The process will output messages into }APQ C3 Thread Control
cube for each thread including main process, then the main process will consolidate all child thread messages into a single message in the }APQ Process Response Message
cube.
Prerequisite:
Prolog:
cCubThreadCtrl = '}APQ C3 Thread Control';
cThisProcName = GetProcessName();
cUserName = TM1User(); If( DIMIX( '}Clients', cUserName ) = 0 ); cUserName = 'Admin'; EndIf;
Epilog:
Provide variable indicating number of failed child threads - this code comes from parallelization snippets (see parallelization chapter)
nChildErrors = CellGetN( cCubThreadCtrl, cUserName, cThisProcName, 'Total Threads', 'Error Flag' );
Epilog:
#Region - Return code & final error message handling
sProcessResultType = If( nErrors = 0 & nChildErrors = 0, 'success', 'danger' );
If( nErrors > 0 % nChildErrors > 0 );
If( nErrors > 0 );
sErrors = NumberToString( nErrors );
sProcessResultMsg = Expand( 'Process has finished with %sErrors% error' | If( nErrors > 1, 's', '') | '.' );
If( nMultiMsgJSON <= 1 );
sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "message":"%sMessage%", "type":"%sProcessResultType%"}' );
Else;
sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "details":{%sMultiMsg%}, "type":"%sProcessResultType%"}' );
EndIf;
sMessage = sProcessResultMsg;
ElseIf( nChildErrors > 0 );
sErrors = NumberToString( nChildErrors );
If( nChildErrors = 1 );
sProcessResultMsg = Expand( 'One of the child processes has finished with errors. Please check details for more information.' );
Else;
sProcessResultMsg = Expand( '%sErrors% child processes have finished with errors. Please check details for more information.' );
EndIf;
nThread = 1;
sThreadDetails = '';
While ( nThread < nThreads );
sThreadID = NumberToString( nThread );
sThreadMessage = CellGetS( cCubThreadCtrl, cUserName, cThisProcName, sThreadID, 'JSON Message' );
sThreadDetails = Expand( '%sThreadDetails%' | If(LONG(sThreadDetails)=0, '', ', ') | '"%sThreadID%": %sThreadMessage%' );
nThread = nThread + 1;
End;
If( sThreadDetails @<> '' );
sSequenceID = NumberToString( nMultiMsgJSON );
sMessageJSON = INSRT(' "thread_details": {', sThreadDetails | '}', 1);
sMultiMsg = Expand( '%sMultiMsg%' | If(LONG(sMultiMsg)=0, '', ', ') | '"' | NumberToString( nMultiMsgJSON ) | '":{%sMessageJSON%}' ); nMultiMsgJSON = nMultiMsgJSON + 1;
EndIf;
If( nMultiMsgJSON <= 1 );
sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "message":"%sMessage%", "type":"%sProcessResultType%"}' );
Else;
sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "details":{%sMultiMsg%}, "type":"%sProcessResultType%"}' );
EndIf;
sMessage = sProcessResultMsg;
EndIf;
nProcessReturnCode = 0;
sProcessReturnCode = sProcessResultJSON;
If( nThreadID = 0 );
sMessage = sProcessResultMsg;
RunProcess( '}APQ.Cub.ProcessResponseMessage.ImmediateLog',
'pProcLogParams', sProcLogParams,
'pProcessStartTime', TimSt(nProcessStartTime, '\Y-\m-\d \h:\i:\s'),
'pProcessFinishTime', 'now',
'pUserName', cUserName,
'pProcessName', cThisProcName,
'pProcessErrorFile', GetProcessErrorFileName(),
'pStatus', 'Error',
'pSuccessMessage', '',
'pFailureMessage', sProcessResultMsg,
'pJSONMessage', sProcessReturnCode
);
LogOutput( cMsgErrorLevel, Expand( cMsgErrorContent ) );
ProcessQuit();
EndIf;
CellPutS( 'Error', cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'Status' );
CellPutN( 1, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'error flag' );
CellPutS( sProcessResultMsg, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'message' );
CellPutS( sProcessReturnCode, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'JSON Message' );
LogOutput( cMsgErrorLevel, Expand( cMsgErrorContent ) );
Else;
If( nThreadID = 0 );
nRecordCount = nTotalRecordCount;
nThread = 1;
sThreadDetails = '';
While ( nThread < nThreads );
sThreadID = NumberToString( nThread );
sThreadMessage = CellGetS( cCubThreadCtrl, cUserName, cThisProcName, sThreadID, 'JSON Message' );
sThreadDetails = Expand( '%sThreadDetails%' | If(LONG(sThreadDetails)=0, '', ', ') | '"%sThreadID%": %sThreadMessage%' );
nThread = nThread + 1;
End;
If( sThreadDetails @<> '' );
sSequenceID = NumberToString( nMultiMsgJSON );
sMessageJSON = INSRT(' "thread_details": {', sThreadDetails | '}', 1);
sMultiMsg = Expand( '%sMultiMsg%' | If(LONG(sMultiMsg)=0, '', ', ') | '"' | NumberToString( nMultiMsgJSON ) | '":{%sMessageJSON%}' ); nMultiMsgJSON = nMultiMsgJSON + 1;
EndIf;
Else;
nRecordCount = nDataRecordCount;
EndIf;
nTime = NOW();
nDeltaTime = ROUND( (nTime - nTimeStart) \ cSecFactor );
sDeltaTime = NumberToString( nDeltaTime );
#@FIX:588:Average throughput uses total record count as basis
sRateAVG = NumberToString( nRecordCount \ nDeltaTime );
sRecordCount = NumberToString( nRecordCount );
If( nThreads > 1 );
If( nRecordCount = 0 );
sMessage = Expand( 'No records were processed!' );
sProcessResultMsg = Expand( 'Process has finished without errors.' );
Else;
#@FIX:588:Total records reporting fix
sMessage = Expand( 'Successfully processed %sRecordCount% records in %sDeltaTime% s. Average throughput=%sRateAVG% records/s.' );
sProcessResultMsg = Expand( 'Process has finished successfully.' );
EndIf;
Else;
If( nRecordCount = 0 );
sMessage = Expand( 'No records were processed!' );
sProcessResultMsg = Expand( 'Process has finished without errors.' );
Else;
#@FIX:588:Total records reporting fix
sMessage = Expand( 'Successfully processed %sRecordCount% records in %sDeltaTime% s. Average throughput=%sRateAVG% records/s.' );
sProcessResultMsg = Expand( 'Process has finished successfully.' );
EndIf;
EndIf;
If( nMultiMsgJSON <= 1 );
sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "message":"%sMessage%", "type":"%sProcessResultType%"}' );
Else;
sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "message":"%sMessage%", "details":{%sMultiMsg%}, "type":"%sProcessResultType%"}' );
EndIf;
nProcessReturnCode = 1;
sProcessReturnCode = sProcessResultJSON;
#@FIX:199:Write back to message cube just in case of main thread
If( nThreadID = 0 );
nProcessReturnCode = 1;
sProcessReturnCode = sProcessResultJSON;
RunProcess( '}APQ.Cub.ProcessResponseMessage.ImmediateLog',
'pProcLogParams', sProcLogParams,
'pProcessStartTime', TimSt(nProcessStartTime, '\Y-\m-\d \h:\i:\s'),
'pProcessFinishTime', 'now',
'pUserName', cUserName,
'pProcessName', cThisProcName,
'pProcessErrorFile', GetProcessErrorFileName(),
'pStatus', 'Success',
'pSuccessMessage', sProcessResultMsg,
'pFailureMessage', '',
'pJSONMessage', sProcessReturnCode
);
EndIf;
#@FIX:2020.11.1:230:Each thread will log its Status, Error Flag, Message and JSON Message into }APQ C3 Thread Control cube
CellPutS( 'Success', cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'Status' );
CellPutN( 0, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'error flag' );
CellPutS( sProcessResultMsg, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'message' );
CellPutS( sProcessReturnCode, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'JSON Message' );
LogOutput('INFO', Expand( 'Process:%cThisProcName% (ThreadID=%pThreadID%): %sMessage%' ) );
EndIf;
#EndRegion - Return code & final error message handling