/
Final Message Preparation for Parallel Enabled Processes

Final Message Preparation for Parallel Enabled Processes

This snippet should be used to conclude execution of parallel enabled processes. The process will output messages into }APQ C3 Thread Control cube for each thread including main process, then the main process will consolidate all child thread messages into a single message in the }APQ Process Response Message cube.

Prerequisite:

Prolog:

  • cCubThreadCtrl = '}APQ C3 Thread Control';

  • cThisProcName = GetProcessName();

  • cUserName = TM1User(); If( DIMIX( '}Clients', cUserName ) = 0 ); cUserName = 'Admin'; EndIf;

Epilog:

  • Provide variable indicating number of failed child threads - this code comes from parallelization snippets (see parallelization chapter)

    nChildErrors = CellGetN( cCubThreadCtrl, cUserName, cThisProcName, 'Total Threads', 'Error Flag' );

Epilog:

#Region - Return code & final error message handling sProcessResultType = If( nErrors = 0 & nChildErrors = 0, 'success', 'danger' ); If( nErrors > 0 % nChildErrors > 0 ); If( nErrors > 0 ); sErrors = NumberToString( nErrors ); sProcessResultMsg = Expand( 'Process has finished with %sErrors% error' | If( nErrors > 1, 's', '') | '.' ); If( nMultiMsgJSON <= 1 ); sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "message":"%sMessage%", "type":"%sProcessResultType%"}' ); Else; sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "details":{%sMultiMsg%}, "type":"%sProcessResultType%"}' ); EndIf; sMessage = sProcessResultMsg; ElseIf( nChildErrors > 0 ); sErrors = NumberToString( nChildErrors ); If( nChildErrors = 1 ); sProcessResultMsg = Expand( 'One of the child processes has finished with errors. Please check details for more information.' ); Else; sProcessResultMsg = Expand( '%sErrors% child processes have finished with errors. Please check details for more information.' ); EndIf; nThread = 1; sThreadDetails = ''; While ( nThread < nThreads ); sThreadID = NumberToString( nThread ); sThreadMessage = CellGetS( cCubThreadCtrl, cUserName, cThisProcName, sThreadID, 'JSON Message' ); sThreadDetails = Expand( '%sThreadDetails%' | If(LONG(sThreadDetails)=0, '', ', ') | '"%sThreadID%": %sThreadMessage%' ); nThread = nThread + 1; End; If( sThreadDetails @<> '' ); sSequenceID = NumberToString( nMultiMsgJSON ); sMessageJSON = INSRT(' "thread_details": {', sThreadDetails | '}', 1); sMultiMsg = Expand( '%sMultiMsg%' | If(LONG(sMultiMsg)=0, '', ', ') | '"' | NumberToString( nMultiMsgJSON ) | '":{%sMessageJSON%}' ); nMultiMsgJSON = nMultiMsgJSON + 1; EndIf; If( nMultiMsgJSON <= 1 ); sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "message":"%sMessage%", "type":"%sProcessResultType%"}' ); Else; sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "details":{%sMultiMsg%}, "type":"%sProcessResultType%"}' ); EndIf; sMessage = sProcessResultMsg; EndIf; nProcessReturnCode = 0; sProcessReturnCode = sProcessResultJSON; If( nThreadID = 0 ); sMessage = sProcessResultMsg; RunProcess( '}APQ.Cub.ProcessResponseMessage.ImmediateLog', 'pProcLogParams', sProcLogParams, 'pProcessStartTime', TimSt(nProcessStartTime, '\Y-\m-\d \h:\i:\s'), 'pProcessFinishTime', 'now', 'pUserName', cUserName, 'pProcessName', cThisProcName, 'pProcessErrorFile', GetProcessErrorFileName(), 'pStatus', 'Error', 'pSuccessMessage', '', 'pFailureMessage', sProcessResultMsg, 'pJSONMessage', sProcessReturnCode ); LogOutput( cMsgErrorLevel, Expand( cMsgErrorContent ) ); ProcessQuit(); EndIf; CellPutS( 'Error', cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'Status' ); CellPutN( 1, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'error flag' ); CellPutS( sProcessResultMsg, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'message' ); CellPutS( sProcessReturnCode, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'JSON Message' ); LogOutput( cMsgErrorLevel, Expand( cMsgErrorContent ) ); Else; If( nThreadID = 0 ); nRecordCount = nTotalRecordCount; nThread = 1; sThreadDetails = ''; While ( nThread < nThreads ); sThreadID = NumberToString( nThread ); sThreadMessage = CellGetS( cCubThreadCtrl, cUserName, cThisProcName, sThreadID, 'JSON Message' ); sThreadDetails = Expand( '%sThreadDetails%' | If(LONG(sThreadDetails)=0, '', ', ') | '"%sThreadID%": %sThreadMessage%' ); nThread = nThread + 1; End; If( sThreadDetails @<> '' ); sSequenceID = NumberToString( nMultiMsgJSON ); sMessageJSON = INSRT(' "thread_details": {', sThreadDetails | '}', 1); sMultiMsg = Expand( '%sMultiMsg%' | If(LONG(sMultiMsg)=0, '', ', ') | '"' | NumberToString( nMultiMsgJSON ) | '":{%sMessageJSON%}' ); nMultiMsgJSON = nMultiMsgJSON + 1; EndIf; Else; nRecordCount = nDataRecordCount; EndIf; nTime = NOW(); nDeltaTime = ROUND( (nTime - nTimeStart) \ cSecFactor ); sDeltaTime = NumberToString( nDeltaTime ); #@FIX:588:Average throughput uses total record count as basis sRateAVG = NumberToString( nRecordCount \ nDeltaTime ); sRecordCount = NumberToString( nRecordCount ); If( nThreads > 1 ); If( nRecordCount = 0 ); sMessage = Expand( 'No records were processed!' ); sProcessResultMsg = Expand( 'Process has finished without errors.' ); Else; #@FIX:588:Total records reporting fix sMessage = Expand( 'Successfully processed %sRecordCount% records in %sDeltaTime% s. Average throughput=%sRateAVG% records/s.' ); sProcessResultMsg = Expand( 'Process has finished successfully.' ); EndIf; Else; If( nRecordCount = 0 ); sMessage = Expand( 'No records were processed!' ); sProcessResultMsg = Expand( 'Process has finished without errors.' ); Else; #@FIX:588:Total records reporting fix sMessage = Expand( 'Successfully processed %sRecordCount% records in %sDeltaTime% s. Average throughput=%sRateAVG% records/s.' ); sProcessResultMsg = Expand( 'Process has finished successfully.' ); EndIf; EndIf; If( nMultiMsgJSON <= 1 ); sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "message":"%sMessage%", "type":"%sProcessResultType%"}' ); Else; sProcessResultJSON = Expand( '{"process":"%cThisProcName%", "header": "%sProcessResultMsg%", "message":"%sMessage%", "details":{%sMultiMsg%}, "type":"%sProcessResultType%"}' ); EndIf; nProcessReturnCode = 1; sProcessReturnCode = sProcessResultJSON; #@FIX:199:Write back to message cube just in case of main thread If( nThreadID = 0 ); nProcessReturnCode = 1; sProcessReturnCode = sProcessResultJSON; RunProcess( '}APQ.Cub.ProcessResponseMessage.ImmediateLog', 'pProcLogParams', sProcLogParams, 'pProcessStartTime', TimSt(nProcessStartTime, '\Y-\m-\d \h:\i:\s'), 'pProcessFinishTime', 'now', 'pUserName', cUserName, 'pProcessName', cThisProcName, 'pProcessErrorFile', GetProcessErrorFileName(), 'pStatus', 'Success', 'pSuccessMessage', sProcessResultMsg, 'pFailureMessage', '', 'pJSONMessage', sProcessReturnCode ); EndIf; #@FIX:2020.11.1:230:Each thread will log its Status, Error Flag, Message and JSON Message into }APQ C3 Thread Control cube CellPutS( 'Success', cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'Status' ); CellPutN( 0, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'error flag' ); CellPutS( sProcessResultMsg, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'message' ); CellPutS( sProcessReturnCode, cCubThreadCtrl, cUserName, cThisProcName, pThreadID, 'JSON Message' ); LogOutput('INFO', Expand( 'Process:%cThisProcName% (ThreadID=%pThreadID%): %sMessage%' ) ); EndIf; #EndRegion - Return code & final error message handling