Final Message Preparation for Parallel Enabled Processes

This snippet should be used to conclude execution of parallel enabled processes. The process will output messages into }APQ C3 Thread Control cube for each thread including main process, then the main process will consolidate all child thread messages into a single message in the }APQ Process Response Message cube.

Prerequisite:

Prolog:

  • cCubThreadCtrl = '}APQ C3 Thread Control';

  • cThisProcName = GetProcessName();

  • cUserName = TM1User(); If( DIMIX( '}Clients', cUserName ) = 0 ); cUserName = 'Admin'; EndIf;

Epilog:

  • Provide variable indicating number of failed child threads - this code comes from parallelization snippets (see parallelization chapter)

    nChildErrors = CellGetN( cCubThreadCtrl, cUserName, cThisProcName, 'Total Threads', 'Error Flag' );

Epilog:

#Region - Return code & final error message handling sProcessResultType = If( nErrors = 0 & nChildErrors = 0, 'success', 'danger' ); If( nErrors > 0 % nChildErrors > 0 ); If( nErrors > 0 ); sErrors = NumberToString( nErrors ); sProcessResultMsg = Expand( 'Process has finished with %sErrors% error' | If( nErrors > 1, 's', '') | '.' ); If( nMultiMsgJSON <= 1 ); sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "message":"%sMessage%", "type":"%sProcessResultType%"}' ); Else; sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "details":{%sMultiMsg%}, "type":"%sProcessResultType%"}' ); EndIf; sMessage = sProcessResultMsg; ElseIf( nChildErrors > 0 ); sErrors = NumberToString( nChildErrors ); If( nChildErrors = 1 ); sProcessResultMsg = Expand( 'One of the child processes has finished with errors. Please check details for more information.' ); Else; sProcessResultMsg = Expand( '%sErrors% child processes have finished with errors. Please check details for more information.' ); EndIf; nThread = 1; sThreadDetails = ''; While ( nThread < nThreads ); sThreadID = NumberToString( nThread ); sThreadMessage = CellGetS( cCubThreadCtrl, cUserName, cThisProcName, sThreadID, 'JSON Message' ); sThreadDetails = Expand( '%sThreadDetails%' | If(LONG(sThreadDetails)=0, '', ', ') | '"%sThreadID%": %sThreadMessage%' ); nThread = nThread + 1; End; If( sThreadDetails @<> '' ); sSequenceID = NumberToString( nMultiMsgJSON ); sMessageJSON = INSRT(' "thread_details": {', sThreadDetails | '}', 1); sMultiMsg = Expand( '%sMultiMsg%' | If(LONG(sMultiMsg)=0, '', ', ') | '"' | NumberToString( nMultiMsgJSON ) | '":{%sMessageJSON%}' ); nMultiMsgJSON = nMultiMsgJSON + 1; EndIf; If( nMultiMsgJSON <= 1 ); sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "message":"%sMessage%", "type":"%sProcessResultType%"}' ); Else; sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "details":{%sMultiMsg%}, "type":"%sProcessResultType%"}' ); EndIf; sMessage = sProcessResultMsg; EndIf; nProcessReturnCode = 0; sProcessReturnCode = sProcessResultJSON; If( nThreadID = 0 ); sMessage = sProcessResultMsg; RunProcess( '}APQ.Cub.ProcessResponseMessage.ImmediateLog', 'pProcLogParams', sProcLogParams, 'pProcessStartTime', TimSt(nProcessStartTime, '\Y-\m-\d \h:\i:\s'), 'pProcessFinishTime', 'now', 'pUserName', cUserName, 'pProcessName', cThisProcName, 'pProcessErrorFile', GetProcessErrorFileName(), 'pStatus', 'Error', 'pSuccessMessage', '', 'pFailureMessage', sProcessResultMsg, 'pJSONMessage', sProcessReturnCode ); LogOutput( cMsgErrorLevel, Expand( cMsgErrorContent ) ); ProcessQuit(); EndIf; CellPutS( 'Error', cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'Status' ); CellPutN( 1, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'error flag' ); CellPutS( sProcessResultMsg, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'message' ); CellPutS( sProcessReturnCode, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'JSON Message' ); LogOutput( cMsgErrorLevel, Expand( cMsgErrorContent ) ); Else; If( nThreadID = 0 ); nRecordCount = nTotalRecordCount; nThread = 1; sThreadDetails = ''; While ( nThread < nThreads ); sThreadID = NumberToString( nThread ); sThreadMessage = CellGetS( cCubThreadCtrl, cUserName, cThisProcName, sThreadID, 'JSON Message' ); sThreadDetails = Expand( '%sThreadDetails%' | If(LONG(sThreadDetails)=0, '', ', ') | '"%sThreadID%": %sThreadMessage%' ); nThread = nThread + 1; End; If( sThreadDetails @<> '' ); sSequenceID = NumberToString( nMultiMsgJSON ); sMessageJSON = INSRT(' "thread_details": {', sThreadDetails | '}', 1); sMultiMsg = Expand( '%sMultiMsg%' | If(LONG(sMultiMsg)=0, '', ', ') | '"' | NumberToString( nMultiMsgJSON ) | '":{%sMessageJSON%}' ); nMultiMsgJSON = nMultiMsgJSON + 1; EndIf; Else; nRecordCount = nDataRecordCount; EndIf; nTime = NOW(); nDeltaTime = ROUND( (nTime - nTimeStart) \ cSecFactor ); sDeltaTime = NumberToString( nDeltaTime ); #@FIX:588:Average throughput uses total record count as basis sRateAVG = NumberToString( nRecordCount \ nDeltaTime ); sRecordCount = NumberToString( nRecordCount ); If( nThreads > 1 ); If( nRecordCount = 0 ); sMessage = Expand( 'No records were processed!' ); sProcessResultMsg = Expand( 'Process has finished without errors.' ); Else; #@FIX:588:Total records reporting fix sMessage = Expand( 'Successfully processed %sRecordCount% records in %sDeltaTime% s. Average throughput=%sRateAVG% records/s.' ); sProcessResultMsg = Expand( 'Process has finished successfully.' ); EndIf; Else; If( nRecordCount = 0 ); sMessage = Expand( 'No records were processed!' ); sProcessResultMsg = Expand( 'Process has finished without errors.' ); Else; #@FIX:588:Total records reporting fix sMessage = Expand( 'Successfully processed %sRecordCount% records in %sDeltaTime% s. Average throughput=%sRateAVG% records/s.' ); sProcessResultMsg = Expand( 'Process has finished successfully.' ); EndIf; EndIf; If( nMultiMsgJSON <= 1 ); sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "message":"%sMessage%", "type":"%sProcessResultType%"}' ); Else; sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "message":"%sMessage%", "details":{%sMultiMsg%}, "type":"%sProcessResultType%"}' ); EndIf; nProcessReturnCode = 1; sProcessReturnCode = sProcessResultJSON; #@FIX:199:Write back to message cube just in case of main thread If( nThreadID = 0 ); nProcessReturnCode = 1; sProcessReturnCode = sProcessResultJSON; RunProcess( '}APQ.Cub.ProcessResponseMessage.ImmediateLog', 'pProcLogParams', sProcLogParams, 'pProcessStartTime', TimSt(nProcessStartTime, '\Y-\m-\d \h:\i:\s'), 'pProcessFinishTime', 'now', 'pUserName', cUserName, 'pProcessName', cThisProcName, 'pProcessErrorFile', GetProcessErrorFileName(), 'pStatus', 'Success', 'pSuccessMessage', sProcessResultMsg, 'pFailureMessage', '', 'pJSONMessage', sProcessReturnCode ); EndIf; #@FIX:2020.11.1:230:Each thread will log its Status, Error Flag, Message and JSON Message into }APQ C3 Thread Control cube CellPutS( 'Success', cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'Status' ); CellPutN( 0, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'error flag' ); CellPutS( sProcessResultMsg, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'message' ); CellPutS( sProcessReturnCode, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'JSON Message' ); LogOutput('INFO', Expand( 'Process:%cThisProcName% (ThreadID=%pThreadID%): %sMessage%' ) ); EndIf; #EndRegion - Return code & final error message handling