/
Final Message Preparation for Parallel Enabled Processes
Final Message Preparation for Parallel Enabled Processes
This snippet should be used to conclude execution of parallel enabled processes. The process will output messages into }APQ C3 Thread Control
cube for each thread including main process, then the main process will consolidate all child thread messages into a single message in the }APQ Process Response Message
cube.
Prerequisite:
Prolog:
cCubThreadCtrl = '}APQ C3 Thread Control';
cThisProcName = GetProcessName();
cUserName = TM1User(); If( DIMIX( '}Clients', cUserName ) = 0 ); cUserName = 'Admin'; EndIf;
Epilog:
Provide variable indicating number of failed child threads - this code comes from parallelization snippets (see parallelization chapter)
nChildErrors = CellGetN( cCubThreadCtrl, cUserName, cThisProcName, 'Total Threads', 'Error Flag' );
Epilog:
#Region - Return code & final error message handling
sProcessResultType = If( nErrors = 0 & nChildErrors = 0, 'success', 'danger' );
If( nErrors > 0 % nChildErrors > 0 );
If( nErrors > 0 );
sErrors = NumberToString( nErrors );
sProcessResultMsg = Expand( 'Process has finished with %sErrors% error' | If( nErrors > 1, 's', '') | '.' );
If( nMultiMsgJSON <= 1 );
sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "message":"%sMessage%", "type":"%sProcessResultType%"}' );
Else;
sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "details":{%sMultiMsg%}, "type":"%sProcessResultType%"}' );
EndIf;
sMessage = sProcessResultMsg;
ElseIf( nChildErrors > 0 );
sErrors = NumberToString( nChildErrors );
If( nChildErrors = 1 );
sProcessResultMsg = Expand( 'One of the child processes has finished with errors. Please check details for more information.' );
Else;
sProcessResultMsg = Expand( '%sErrors% child processes have finished with errors. Please check details for more information.' );
EndIf;
nThread = 1;
sThreadDetails = '';
While ( nThread < nThreads );
sThreadID = NumberToString( nThread );
sThreadMessage = CellGetS( cCubThreadCtrl, cUserName, cThisProcName, sThreadID, 'JSON Message' );
sThreadDetails = Expand( '%sThreadDetails%' | If(LONG(sThreadDetails)=0, '', ', ') | '"%sThreadID%": %sThreadMessage%' );
nThread = nThread + 1;
End;
If( sThreadDetails @<> '' );
sSequenceID = NumberToString( nMultiMsgJSON );
sMessageJSON = INSRT(' "thread_details": {', sThreadDetails | '}', 1);
sMultiMsg = Expand( '%sMultiMsg%' | If(LONG(sMultiMsg)=0, '', ', ') | '"' | NumberToString( nMultiMsgJSON ) | '":{%sMessageJSON%}' ); nMultiMsgJSON = nMultiMsgJSON + 1;
EndIf;
If( nMultiMsgJSON <= 1 );
sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "message":"%sMessage%", "type":"%sProcessResultType%"}' );
Else;
sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "details":{%sMultiMsg%}, "type":"%sProcessResultType%"}' );
EndIf;
sMessage = sProcessResultMsg;
EndIf;
nProcessReturnCode = 0;
sProcessReturnCode = sProcessResultJSON;
If( nThreadID = 0 );
sMessage = sProcessResultMsg;
RunProcess( '}APQ.Cub.ProcessResponseMessage.ImmediateLog',
'pProcLogParams', sProcLogParams,
'pProcessStartTime', TimSt(nProcessStartTime, '\Y-\m-\d \h:\i:\s'),
'pProcessFinishTime', 'now',
'pUserName', cUserName,
'pProcessName', cThisProcName,
'pProcessErrorFile', GetProcessErrorFileName(),
'pStatus', 'Error',
'pSuccessMessage', '',
'pFailureMessage', sProcessResultMsg,
'pJSONMessage', sProcessReturnCode
);
LogOutput( cMsgErrorLevel, Expand( cMsgErrorContent ) );
ProcessQuit();
EndIf;
CellPutS( 'Error', cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'Status' );
CellPutN( 1, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'error flag' );
CellPutS( sProcessResultMsg, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'message' );
CellPutS( sProcessReturnCode, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'JSON Message' );
LogOutput( cMsgErrorLevel, Expand( cMsgErrorContent ) );
Else;
If( nThreadID = 0 );
nRecordCount = nTotalRecordCount;
nThread = 1;
sThreadDetails = '';
While ( nThread < nThreads );
sThreadID = NumberToString( nThread );
sThreadMessage = CellGetS( cCubThreadCtrl, cUserName, cThisProcName, sThreadID, 'JSON Message' );
sThreadDetails = Expand( '%sThreadDetails%' | If(LONG(sThreadDetails)=0, '', ', ') | '"%sThreadID%": %sThreadMessage%' );
nThread = nThread + 1;
End;
If( sThreadDetails @<> '' );
sSequenceID = NumberToString( nMultiMsgJSON );
sMessageJSON = INSRT(' "thread_details": {', sThreadDetails | '}', 1);
sMultiMsg = Expand( '%sMultiMsg%' | If(LONG(sMultiMsg)=0, '', ', ') | '"' | NumberToString( nMultiMsgJSON ) | '":{%sMessageJSON%}' ); nMultiMsgJSON = nMultiMsgJSON + 1;
EndIf;
Else;
nRecordCount = nDataRecordCount;
EndIf;
nTime = NOW();
nDeltaTime = ROUND( (nTime - nTimeStart) \ cSecFactor );
sDeltaTime = NumberToString( nDeltaTime );
#@FIX:588:Average throughput uses total record count as basis
sRateAVG = NumberToString( nRecordCount \ nDeltaTime );
sRecordCount = NumberToString( nRecordCount );
If( nThreads > 1 );
If( nRecordCount = 0 );
sMessage = Expand( 'No records were processed!' );
sProcessResultMsg = Expand( 'Process has finished without errors.' );
Else;
#@FIX:588:Total records reporting fix
sMessage = Expand( 'Successfully processed %sRecordCount% records in %sDeltaTime% s. Average throughput=%sRateAVG% records/s.' );
sProcessResultMsg = Expand( 'Process has finished successfully.' );
EndIf;
Else;
If( nRecordCount = 0 );
sMessage = Expand( 'No records were processed!' );
sProcessResultMsg = Expand( 'Process has finished without errors.' );
Else;
#@FIX:588:Total records reporting fix
sMessage = Expand( 'Successfully processed %sRecordCount% records in %sDeltaTime% s. Average throughput=%sRateAVG% records/s.' );
sProcessResultMsg = Expand( 'Process has finished successfully.' );
EndIf;
EndIf;
If( nMultiMsgJSON <= 1 );
sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "message":"%sMessage%", "type":"%sProcessResultType%"}' );
Else;
sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "message":"%sMessage%", "details":{%sMultiMsg%}, "type":"%sProcessResultType%"}' );
EndIf;
nProcessReturnCode = 1;
sProcessReturnCode = sProcessResultJSON;
#@FIX:199:Write back to message cube just in case of main thread
If( nThreadID = 0 );
nProcessReturnCode = 1;
sProcessReturnCode = sProcessResultJSON;
RunProcess( '}APQ.Cub.ProcessResponseMessage.ImmediateLog',
'pProcLogParams', sProcLogParams,
'pProcessStartTime', TimSt(nProcessStartTime, '\Y-\m-\d \h:\i:\s'),
'pProcessFinishTime', 'now',
'pUserName', cUserName,
'pProcessName', cThisProcName,
'pProcessErrorFile', GetProcessErrorFileName(),
'pStatus', 'Success',
'pSuccessMessage', sProcessResultMsg,
'pFailureMessage', '',
'pJSONMessage', sProcessReturnCode
);
EndIf;
#@FIX:2020.11.1:230:Each thread will log its Status, Error Flag, Message and JSON Message into }APQ C3 Thread Control cube
CellPutS( 'Success', cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'Status' );
CellPutN( 0, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'error flag' );
CellPutS( sProcessResultMsg, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'message' );
CellPutS( sProcessReturnCode, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'JSON Message' );
LogOutput('INFO', Expand( 'Process:%cThisProcName% (ThreadID=%pThreadID%): %sMessage%' ) );
EndIf;
#EndRegion - Return code & final error message handling