瀏覽代碼

#489 Run the synchro with parameter max_chunk_size to split the execution into several processes, each loading at most the given count of replica (note: the master process will continue to run while child processes are being forked one by one)

git-svn-id: http://svn.code.sf.net/p/itop/code/trunk@1732 a333f486-631f-4898-b8df-5754b55c2be0
romainq 13 年之前
父節點
當前提交
5ec63a30ca

+ 113 - 3
application/utils.inc.php

@@ -46,6 +46,7 @@ class utils
 
 	// Parameters loaded from a file, parameters of the page/command line still have precedence
 	private static $m_aParamsFromFile = null;
+	private static $m_aParamSource = array();
 
 	protected static function LoadParamFile($sParamFile)
 	{
@@ -82,6 +83,7 @@ class utils
 				$sParam = $aMatches[1];
 				$value = trim($aMatches[2]);
 				self::$m_aParamsFromFile[$sParam] = $value;
+				self::$m_aParamSource[$sParam] = $sParamFile;
 			}
 		}
 	}
@@ -99,6 +101,25 @@ class utils
 		}
 	}
 
+	/**
+	 * Return the source file from which the parameter has been found,
+	 * usefull when it comes to pass user credential to a process executed
+	 * in the background	 
+	 * @param $sName Parameter name
+	 * @return The file name if any, or null
+	 */
+	public static function GetParamSourceFile($sName)
+	{
+		if (array_key_exists($sName, self::$m_aParamSource))
+		{
+			return self::$m_aParamSource[$sName];
+		}
+		else
+		{
+			return null;
+		}
+	}
+
 	public static function IsModeCLI()
 	{
 		$sSAPIName = php_sapi_name();
@@ -152,10 +173,18 @@ class utils
 	
 	public static function Sanitize($value, $defaultValue, $sSanitizationFilter)
 	{
-		$retValue = self::Sanitize_Internal($value, $sSanitizationFilter);
-		if ($retValue === false)
+		if ($value === $defaultValue)
 		{
-			$retValue = $defaultValue;
+			// Preserve the real default value (can be used to detect missing mandatory parameters)
+			$retValue = $value;
+		}
+		else
+		{
+			$retValue = self::Sanitize_Internal($value, $sSanitizationFilter);
+			if ($retValue === false)
+			{
+				$retValue = $defaultValue;
+			}
 		}
 		return $retValue;		
 	}
@@ -601,5 +630,86 @@ class utils
 		}
 		echo "<p><pre>".print_r($aLightTrace, true)."</pre></p>\n";
 	 }
+
+	/**
+	 * Execute the given iTop PHP script, passing it the current credentials
+	 * Only CLI mode is supported, because of the need to hand the credentials over to the next process
+	 * Throws an exception if the execution fails or could not be attempted (config issue)
+	 * @param string $sScript Name and relative path to the file (relative to the iTop root dir)
+	 * @param hash $aArguments Associative array of 'arg' => 'value'
+	 * @return array(iCode, array(output lines))
+	 */	 	  
+	/**
+	 */
+	static function ExecITopScript($sScriptName, $aArguments)
+	{
+		$aDisabled = explode(', ', ini_get('disable_functions'));
+		if (in_array('exec', $aDisabled))
+		{
+			throw new Exception("The PHP exec() function has been disabled on this server");
+		}
+
+		$sPHPExec = trim(MetaModel::GetConfig()->Get('php_path'));
+		if (strlen($sPHPExec) == 0)
+		{
+			throw new Exception("The path to php must not be empty. Please set a value for 'php_path' in your configuration file.");
+		}
+
+		$sAuthUser = self::ReadParam('auth_user', '', 'raw_data');
+		$sAuthPwd = self::ReadParam('auth_pwd', '', 'raw_data');
+		$sParamFile = self::GetParamSourceFile('auth_user');
+		if (is_null($sParamFile))
+		{
+			$aArguments['auth_user'] = $sAuthUser;
+			$aArguments['auth_pwd'] = $sAuthPwd;
+		}
+		else
+		{
+			$aArguments['param_file'] = $sParamFile;
+		}
+		
+		$aArgs = array();
+		foreach($aArguments as $sName => $value)
+		{
+			// Note: See comment from the 23-Apr-2004 03:30 in the PHP documentation
+			//    It suggests to rely on pctnl_* function instead of using escapeshellargs
+			$aArgs[] = "--$sName=".escapeshellarg($value);
+		}
+		$sArgs = implode(' ', $aArgs);
+		
+		$sScript = realpath(APPROOT.$sScriptName);
+		if (!file_exists($sScript))
+		{
+			throw new Exception("Could not find the script file '$sScriptName' from the directory '".APPROOT."'");
+		}
+
+		$sCommand = '"'.$sPHPExec.'" '.escapeshellarg($sScript).' -- '.$sArgs;
+
+		if (version_compare(phpversion(), '5.3.0', '<'))
+		{
+			if (substr(PHP_OS,0,3) == 'WIN')
+			{
+				// Under Windows, and for PHP 5.2.x, the whole command has to be quoted
+				// Cf PHP doc: http://php.net/manual/fr/function.exec.php, comment from the 27-Dec-2010
+				$sCommand = '"'.$sCommand.'"';
+			}
+		}
+
+		$sLastLine = exec($sCommand, $aOutput, $iRes);
+		if ($iRes == 1)
+		{
+			throw new Exception(Dict::S('Core:ExecProcess:Code1')." - ".$sCommand);
+		}
+		elseif ($iRes == 255)
+		{
+			$sErrors = implode("\n", $aOutput);
+			throw new Exception(Dict::S('Core:ExecProcess:Code255')." - ".$sCommand.":\n".$sErrors);
+		}
+
+		//$aOutput[] = $sCommand;
+		return array($iRes, $aOutput);
+	}
+
+
 }
 ?>

+ 8 - 0
core/config.class.inc.php

@@ -124,6 +124,14 @@ class Config
 			'source_of_value' => '',
 			'show_in_conf_sample' => false,
 		),
+		'php_path' => array(
+			'type' => 'string',
+			'description' => 'Path to the php executable in CLI mode',
+			'default' => 'php',
+			'value' => 'php',
+			'source_of_value' => '',
+			'show_in_conf_sample' => true,
+		),
 		'session_name' => array(
 			'type' => 'string',
 			'description' => 'The name of the cookie used to store the PHP session id',

+ 2 - 2
core/dbobject.class.php

@@ -931,7 +931,7 @@ abstract class DBObject
 
 					$oDeletionPlan->AddToDelete($oReplica, DEL_SILENT);
 
-					if ($oDataSource->GetKey() == SynchroDataSource::GetCurrentTaskId())
+					if ($oDataSource->GetKey() == SynchroExecution::GetCurrentTaskId())
 					{
 						// The current task has the right to delete the object
 						continue;
@@ -1860,7 +1860,7 @@ abstract class DBObject
 		$oSet = $this->GetMasterReplica();
 		while($aData = $oSet->FetchAssoc())
 		{
-			if ($aData['datasource']->GetKey() == SynchroDataSource::GetCurrentTaskId())
+			if ($aData['datasource']->GetKey() == SynchroExecution::GetCurrentTaskId())
 			{
 				// Ignore the current task (check to write => ok)
 				continue;

+ 3 - 0
dictionaries/dictionary.itop.core.php

@@ -639,6 +639,7 @@ Dict::Add('EN US', 'English', 'English', array(
 	'Core:SyncDataSourceObsolete' => 'The data source is marked as obsolete. Operation cancelled.',
 	'Core:SyncDataSourceAccessRestriction' => 'Only adminstrators or the user specified in the data source can execute this operation. Operation cancelled.',
 	'Core:SyncTooManyMissingReplicas' => 'All records have been untouched for some time (all of the objects could be deleted). Please check that the process that writes into the synchronization table is still running. Operation cancelled.',
+	'Core:SyncSplitModeCLIOnly' => 'The synchronization can be executed in chunks only if run in mode CLI',
 	'Core:Synchro:ListReplicas_AllReplicas_Errors_Warnings' => '%1$s replicas, %2$s error(s), %3$s warning(s).',
 	'Core:SynchroReplica:TargetObject' => 'Synchronized Object: %1$s',
 	'Class:AsyncSendEmail' => 'Email (asynchronous)',
@@ -733,6 +734,8 @@ Dict::Add('EN US', 'English', 'English', array(
 	'Class:appUserPreferences' => 'User Preferences',
 	'Class:appUserPreferences/Attribute:userid' => 'User',
 	'Class:appUserPreferences/Attribute:preferences' => 'Prefs',
+	'Core:ExecProcess:Code1' => 'Wrong command or command finished with errors (e.g. wrong script name)',
+	'Core:ExecProcess:Code255' => 'PHP Error (parsing, or runtime)',
 ));
 
 //

+ 133 - 0
synchro/priv_sync_chunk.php

@@ -0,0 +1,133 @@
+<?php
+// Copyright (C) 2010 Combodo SARL
+//
+//   This program is free software; you can redistribute it and/or modify
+//   it under the terms of the GNU General Public License as published by
+//   the Free Software Foundation; version 3 of the License.
+//
+//   This program is distributed in the hope that it will be useful,
+//   but WITHOUT ANY WARRANTY; without even the implied warranty of
+//   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+//   GNU General Public License for more details.
+//
+//   You should have received a copy of the GNU General Public License
+//   along with this program; if not, write to the Free Software
+//   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+/**
+ * Internal: synchronize part of the records - cannot be invoked separately 
+ *
+ * @author      Erwan Taloc <erwan.taloc@combodo.com>
+ * @author      Romain Quetiez <romain.quetiez@combodo.com>
+ * @author      Denis Flaven <denis.flaven@combodo.com>
+ * @license     http://www.opensource.org/licenses/gpl-3.0.html LGPL
+ */
+
+if (!defined('__DIR__')) define('__DIR__', dirname(__FILE__));
+require_once(__DIR__.'/../approot.inc.php');
+require_once(APPROOT.'/application/application.inc.php');
+require_once(APPROOT.'/application/webpage.class.inc.php');
+require_once(APPROOT.'/application/csvpage.class.inc.php');
+require_once(APPROOT.'/application/clipage.class.inc.php');
+
+require_once(APPROOT.'/application/startup.inc.php');
+
+
+function ReadMandatoryParam($oP, $sParam, $sSanitizationFilter = 'parameter')
+{
+	$sValue = utils::ReadParam($sParam, null, true /* Allow CLI */, $sSanitizationFilter);
+	if (is_null($sValue))
+	{
+		$oP->p("ERROR: Missing argument '$sParam'\n");
+		exit(29);
+	}
+	return trim($sValue);
+}
+
+/////////////////////////////////
+// Main program
+
+if (!utils::IsModeCLI())
+{
+	$oP = new WebPage(Dict::S("TitleSynchroExecution"));
+	$oP->p("This page is used internally by iTop");		
+	$oP->output();
+	exit -2;
+}
+
+$oP = new CLIPage(Dict::S("TitleSynchroExecution"));
+
+try
+{
+	utils::UseParamFile();
+}
+catch(Exception $e)
+{
+	$oP->p("Error: ".$e->GetMessage());
+	$oP->output();
+	exit -2;
+}
+
+// Next steps:
+//   specific arguments: 'csvfile'
+//   
+$sAuthUser = ReadMandatoryParam($oP, 'auth_user', 'raw_data');
+$sAuthPwd = ReadMandatoryParam($oP, 'auth_pwd', 'raw_data');
+if (UserRights::CheckCredentials($sAuthUser, $sAuthPwd))
+{
+	UserRights::Login($sAuthUser); // Login & set the user's language
+}
+else
+{
+	$oP->p("Access restricted or wrong credentials ('$sAuthUser')");
+	$oP->output();
+	exit -1;
+}
+
+$iStepCount = ReadMandatoryParam($oP, 'step_count');
+$oP->p('Executing a partial synchro - step '.$iStepCount);
+
+$iSource = ReadMandatoryParam($oP, 'source');
+
+$iStatLog = ReadMandatoryParam($oP, 'log');
+$iChange = ReadMandatoryParam($oP, 'change');
+$sLastFullLoad = ReadMandatoryParam($oP, 'last_full_load', 'raw_data');
+$iChunkSize = ReadMandatoryParam($oP, 'chunk');
+
+$oP->p('Last full load: '.$sLastFullLoad);
+$oP->p('Chunk size: '.$iChunkSize);
+$oP->p('Source: '.$iSource);
+
+try
+{
+	$oSynchroDataSource = MetaModel::GetObject('SynchroDataSource', $iSource);
+	$oLog = MetaModel::GetObject('SynchroLog', $iStatLog);
+	$oChange = MetaModel::GetObject('CMDBChange', $iChange);
+	
+	if (strlen($sLastFullLoad) > 0)
+	{
+		$oLastFullLoad = new DateTime($sLastFullLoad);
+		$oSynchroExec = new SynchroExecution($oSynchroDataSource, $oLastFullLoad);
+	}
+	else
+	{
+		$oSynchroExec = new SynchroExecution($oSynchroDataSource);
+	}
+	if ($oSynchroExec->DoSynchronizeChunk($oLog, $oChange, $iChunkSize))
+	{
+		$oP->p("continue");
+	}
+	else
+	{
+		$oP->p("done!");
+	}
+	$oP->output();
+}
+catch(Exception $e)
+{
+	$oP->p("Error: ".$e->GetMessage());
+	$oP->add($e->getTraceAsString());
+	$oP->output();
+	exit(28);
+}
+?>

+ 3 - 2
synchro/synchro_exec.php

@@ -51,7 +51,7 @@ function UsageAndExit($oP)
 	if ($bModeCLI)
 	{
 		$oP->p("USAGE:\n");
-		$oP->p("php -q synchro_exec.php --auth_user=<login> --auth_pwd=<password> --data_sources=<comma_separated_list_of_data_sources>\n");		
+		$oP->p("php -q synchro_exec.php --auth_user=<login> --auth_pwd=<password> --data_sources=<comma_separated_list_of_data_sources> [max_chunk_size=<limit the count of replica loaded in a single pass>]\n");		
 	}
 	else
 	{
@@ -147,7 +147,8 @@ foreach(explode(',', $sDataSourcesList) as $iSDS)
 		}
 		try
 		{
-			$oStatLog = $oSynchroDataSource->Synchronize(null);
+			$oSynchroExec = new SynchroExecution($oSynchroDataSource);
+			$oStatLog = $oSynchroExec->Process();
 			if ($bSimulate)
 			{
 				CMDBSource::Query('ROLLBACK');

+ 9 - 1
synchro/synchro_import.php

@@ -120,6 +120,13 @@ $aPageParams = array
 		'default' => 'summary',
 		'description' => '[retcode] to return the count of lines in error, [summary] to return a concise report, [details] to get a detailed report (each line listed)',
 	),
+	'max_chunk_size' => array
+	(
+		'mandatory' => false,
+		'modes' => 'cli',
+		'default' => '0',
+		'description' => 'Limit on the count of records that can be loaded at once while performing the synchronization',
+	),
 /*
 	'reportlevel' => array
 	(
@@ -621,7 +628,8 @@ try
 		//
 		if ($bSynchronize)
 		{
-			$oStatLog = $oDataSource->Synchronize($oLoadStartDate);
+			$oSynchroExec = new SynchroExecution($oDataSource, $oLoadStartDate);
+			$oStatLog = $oSynchroExec->Process();
 			$oP->add_comment('Synchronization---');
 			$oP->add_comment('------------------');
 			if ($sOutput == 'details')

+ 637 - 324
synchro/synchrodatasource.class.inc.php

@@ -98,19 +98,6 @@ class SynchroDataSource extends cmdbAbstractObject
 //		MetaModel::Init_SetZListItems('advanced_search', array('name')); // Criteria of the advanced search form
 	}
 
-	public static $m_oCurrentTask = null;
-	public static function GetCurrentTaskId()
-	{
-		if (is_object(self::$m_oCurrentTask))
-		{
-			return self::$m_oCurrentTask->GetKey();
-		}
-		else
-		{
-			return null;
-		}
-	}
-
 	public function DisplayBareRelations(WebPage $oPage, $bEditMode = false)
 	{
 		if (!$this->IsNew())
@@ -913,7 +900,8 @@ EOF
 
 		return $bFixNeeded;
 	}
-	protected function SendNotification($sSubject, $sBody)
+
+	public function SendNotification($sSubject, $sBody)
 	{
 		$iContact = $this->Get('notify_contact_id');
 		if ($iContact == 0)
@@ -958,315 +946,6 @@ EOF
 	}
 
 	/**
-	 * Perform a synchronization between the data stored in the replicas (&synchro_data_xxx_xx table)
-	 * and the iTop objects. If the lastFullLoadStartDate is NOT specified then the full_load_periodicity
-	 * is used to determine which records are obsolete.
-	 * @param Hash $aTraces Debugs/Trace information, one or more entries per replica
-	 * @param DateTime $oLastFullLoadStartDate Date of the last full load (start date/time), if known
-	 * @return void
-	 */
-	public function Synchronize($oLastFullLoadStartDate = null)
-	{
-		// Create a change used for logging all the modifications/creations happening during the synchro
-		$oMyChange = MetaModel::NewObject("CMDBChange");
-		$oMyChange->Set("date", time());
-		$sUserString = CMDBChange::GetCurrentUserName();
-		$oMyChange->Set("userinfo", $sUserString.' '.Dict::S('Core:SyncDataExchangeComment'));
-		$iChangeId = $oMyChange->DBInsert();
-
-		// Start logging this execution (stats + protection against reentrance)
-		//
-		$oStatLog = new SynchroLog();
-		$oStatLog->Set('sync_source_id', $this->GetKey());
-		$oStatLog->Set('start_date', time());
-		$oStatLog->Set('status', 'running');
-		$oStatLog->Set('stats_nb_replica_seen', 0);
-		$oStatLog->Set('stats_nb_replica_total', 0);
-		$oStatLog->Set('stats_nb_obj_deleted', 0);
-		$oStatLog->Set('stats_nb_obj_deleted_errors', 0);
-		$oStatLog->Set('stats_nb_obj_obsoleted', 0);
-		$oStatLog->Set('stats_nb_obj_obsoleted_errors', 0);
-		$oStatLog->Set('stats_nb_obj_created', 0);
-		$oStatLog->Set('stats_nb_obj_created_errors', 0);
-		$oStatLog->Set('stats_nb_obj_created_warnings', 0);
-		$oStatLog->Set('stats_nb_obj_updated', 0);
-		$oStatLog->Set('stats_nb_obj_updated_warnings', 0);
-		$oStatLog->Set('stats_nb_obj_updated_errors', 0);
-		$oStatLog->Set('stats_nb_obj_unchanged_warnings', 0);
-		//		$oStatLog->Set('stats_nb_replica_reconciled', 0);
-		$oStatLog->Set('stats_nb_replica_reconciled_errors', 0);
-		$oStatLog->Set('stats_nb_replica_disappeared_no_action', 0);
-		$oStatLog->Set('stats_nb_obj_new_updated', 0);
-		$oStatLog->Set('stats_nb_obj_new_updated_warnings', 0);
-		$oStatLog->Set('stats_nb_obj_new_unchanged',0);
-		$oStatLog->Set('stats_nb_obj_new_unchanged_warnings',0);
-		
-		$sSelectTotal  = "SELECT SynchroReplica WHERE sync_source_id = :source_id";
-		$oSetTotal = new DBObjectSet(DBObjectSearch::FromOQL($sSelectTotal), array() /* order by*/, array('source_id' => $this->GetKey()));
-		$oStatLog->Set('stats_nb_replica_total', $oSetTotal->Count());
-
-		$oStatLog->DBInsertTracked($oMyChange);
-
-		self::$m_oCurrentTask = $this;
-		try
-		{
-			$this->DoSynchronize($oLastFullLoadStartDate, $oMyChange, $oStatLog);
-			$oStatLog->Set('end_date', time());
-			$oStatLog->Set('status', 'completed');
-			$oStatLog->DBUpdateTracked($oMyChange);
-
-			$iErrors = $oStatLog->GetErrorCount();
-			if ($iErrors > 0)
-			{
-				$sIssuesOQL = "SELECT SynchroReplica WHERE sync_source_id=".$this->GetKey()." AND status_last_error!=''";
-				$sAbsoluteUrl = utils::GetAbsoluteUrlAppRoot();
-				$sIssuesURL = "{$sAbsoluteUrl}synchro/replica.php?operation=oql&datasource=".$this->GetKey()."&oql=".urlencode($sIssuesOQL);
-				$sSeeIssues = "<p></p>";
-
-				$sStatistics = "<h1>Statistics</h1>\n";
-				$sStatistics .= "<ul>\n";
-				$sStatistics .= "<li>".$oStatLog->GetLabel('start_date').": ".$oStatLog->Get('start_date')."</li>\n";
-				$sStatistics .= "<li>".$oStatLog->GetLabel('end_date').": ".$oStatLog->Get('end_date')."</li>\n";
-				$sStatistics .= "<li>".$oStatLog->GetLabel('stats_nb_replica_seen').": ".$oStatLog->Get('stats_nb_replica_seen')."</li>\n";
-				$sStatistics .= "<li>".$oStatLog->GetLabel('stats_nb_replica_total').": ".$oStatLog->Get('stats_nb_replica_total')."</li>\n";
-				$sStatistics .= "<li>".$oStatLog->GetLabel('stats_nb_obj_deleted').": ".$oStatLog->Get('stats_nb_obj_deleted')."</li>\n";
-				$sStatistics .= "<li>".$oStatLog->GetLabel('stats_nb_obj_deleted_errors').": ".$oStatLog->Get('stats_nb_obj_deleted_errors')."</li>\n";
-				$sStatistics .= "<li>".$oStatLog->GetLabel('stats_nb_obj_obsoleted').": ".$oStatLog->Get('stats_nb_obj_obsoleted')."</li>\n";
-				$sStatistics .= "<li>".$oStatLog->GetLabel('stats_nb_obj_obsoleted_errors').": ".$oStatLog->Get('stats_nb_obj_obsoleted_errors')."</li>\n";
-				$sStatistics .= "<li>".$oStatLog->GetLabel('stats_nb_obj_created').": ".$oStatLog->Get('stats_nb_obj_created')." (".$oStatLog->Get('stats_nb_obj_created_warnings')." warnings)"."</li>\n";
-				$sStatistics .= "<li>".$oStatLog->GetLabel('stats_nb_obj_created_errors').": ".$oStatLog->Get('stats_nb_obj_created_errors')."</li>\n";
-				$sStatistics .= "<li>".$oStatLog->GetLabel('stats_nb_obj_updated').": ".$oStatLog->Get('stats_nb_obj_updated')." (".$oStatLog->Get('stats_nb_obj_updated_warnings')." warnings)"."</li>\n";
-				$sStatistics .= "<li>".$oStatLog->GetLabel('stats_nb_obj_updated_errors').": ".$oStatLog->Get('stats_nb_obj_updated_errors')."</li>\n";
-				$sStatistics .= "<li>".$oStatLog->GetLabel('stats_nb_replica_reconciled_errors').": ".$oStatLog->Get('stats_nb_replica_reconciled_errors')."</li>\n";
-				$sStatistics .= "<li>".$oStatLog->GetLabel('stats_nb_replica_disappeared_no_action').": ".$oStatLog->Get('stats_nb_replica_disappeared_no_action')."</li>\n";
-				$sStatistics .= "<li>".$oStatLog->GetLabel('stats_nb_obj_new_updated').": ".$oStatLog->Get('stats_nb_obj_new_updated')." (".$oStatLog->Get('stats_nb_obj_new_updated_warnings')." warnings)"."</li>\n";
-				$sStatistics .= "<li>".$oStatLog->GetLabel('stats_nb_obj_new_unchanged').": ".$oStatLog->Get('stats_nb_obj_new_unchanged')." (".$oStatLog->Get('stats_nb_obj_new_unchanged_warnings')." warnings)"."</li>\n";
-				$sStatistics .= "</ul>\n";
-
-				$this->SendNotification("errors ($iErrors)", "<p>The synchronization has been executed, $iErrors errors have been encountered. Click <a href=\"$sIssuesURL\">here</a> to see the records being currently in error.</p>".$sStatistics);
-			}
-			else
-			{
-				//$this->SendNotification('success', '<p>The synchronization has been successfully executed.</p>');
-			}
-		}
-		catch (SynchroExceptionNotStarted $e)
-		{
-			// Set information for reporting... but delete the object in DB
-			$oStatLog->Set('end_date', time());
-			$oStatLog->Set('status', 'error');
-			$oStatLog->Set('last_error', $e->getMessage());
-			$oStatLog->DBDeleteTracked($oMyChange);
-			$this->SendNotification('fatal error', '<p>The synchronization could not start: \''.$e->getMessage().'\'</p><p>Please check its configuration</p>');
-		}
-		catch (Exception $e)
-		{
-			$oStatLog->Set('end_date', time());
-			$oStatLog->Set('status', 'error');
-			$oStatLog->Set('last_error', $e->getMessage());
-			$oStatLog->DBUpdateTracked($oMyChange);
-			$this->SendNotification('exception', '<p>The synchronization has been interrupted: \''.$e->getMessage().'\'</p><p>Please contact the application support team</p>');
-		}
-		self::$m_oCurrentTask = null;
-
-		return $oStatLog;
-	}
-
-	protected function DoSynchronize($oLastFullLoadStartDate, $oMyChange, &$oStatLog)
-	{
-		if ($this->Get('status') == 'obsolete')
-		{
-			throw new SynchroExceptionNotStarted(Dict::S('Core:SyncDataSourceObsolete'));
-		}
-		if (!UserRights::IsAdministrator() && $this->Get('user_id') != UserRights::GetUserId())
-		{
-			throw new SynchroExceptionNotStarted(Dict::S('Core:SyncDataSourceAccessRestriction'));
-		}
-
-		// Get the list of SQL columns
-		$sClass = $this->GetTargetClass();
-		$aAttCodesExpected = array();
-		$aAttCodesToReconcile = array();
-		$aAttCodesToUpdate = array();
-		$sSelectAtt  = "SELECT SynchroAttribute WHERE sync_source_id = :source_id AND (update = 1 OR reconcile = 1)";
-		$oSetAtt = new DBObjectSet(DBObjectSearch::FromOQL($sSelectAtt), array() /* order by*/, array('source_id' => $this->GetKey()) /* aArgs */);
-		while ($oSyncAtt = $oSetAtt->Fetch())
-		{
-			if ($oSyncAtt->Get('update'))
-			{
-				$aAttCodesToUpdate[$oSyncAtt->Get('attcode')] = $oSyncAtt;
-			}
-			if ($oSyncAtt->Get('reconcile'))
-			{
-				$aAttCodesToReconcile[$oSyncAtt->Get('attcode')] = $oSyncAtt;
-			}
-			$aAttCodesExpected[$oSyncAtt->Get('attcode')] = $oSyncAtt;
-		}
-		$aColumns = $this->GetSQLColumns(array_keys($aAttCodesExpected));
-		$aExtDataFields = array_keys($aColumns);
-		$aExtDataFields[] = 'primary_key';
-		$aExtDataSpec = array(
-			'table' => $this->GetDataTable(),
-			'join_key' => 'id',
-			'fields' => $aExtDataFields
-		);
-
-		// Get the list of attributes, determine reconciliation keys and update targets
-		//
-		if ($this->Get('reconciliation_policy') == 'use_attributes')
-		{
-			$aReconciliationKeys = $aAttCodesToReconcile;
-		}
-		elseif ($this->Get('reconciliation_policy') == 'use_primary_key')
-		{
-			// Override the settings made at the attribute level !
-			$aReconciliationKeys = array("primary_key" => null);
-		}
-
-		$oStatLog->AddTrace("Update of: {".implode(', ', array_keys($aAttCodesToUpdate))."}");
-		$oStatLog->AddTrace("Reconciliation on: {".implode(', ', array_keys($aReconciliationKeys))."}");
-
-		if (count($aAttCodesToUpdate) == 0)
-		{
-			$oStatLog->AddTrace("No attribute to update");
-			throw new SynchroExceptionNotStarted('There is no attribute to update');
-		}
-		if (count($aReconciliationKeys) == 0)
-		{
-			$oStatLog->AddTrace("No attribute for reconciliation");
-			throw new SynchroExceptionNotStarted('No attribute for reconciliation');
-		}
-		
-		$aAttributes = array();
-		foreach($aAttCodesToUpdate as $sAttCode => $oSyncAtt)
-		{
-			$oAttDef = MetaModel::GetAttributeDef($this->GetTargetClass(), $sAttCode);
-			if ($oAttDef->IsWritable())
-			{
-				$aAttributes[$sAttCode] = $oSyncAtt;
-			}
-		}
-
-		// Count the replicas
-		$sSelectAll  = "SELECT SynchroReplica WHERE sync_source_id = :source_id";
-		$oSetAll = new DBObjectSet(DBObjectSearch::FromOQL($sSelectAll), array() /* order by*/, array('source_id' => $this->GetKey()));
-		$iCountAllReplicas = $oSetAll->Count();
-		$oStatLog->Set('stats_nb_replica_total', $iCountAllReplicas);
-
-		// Get all the replicas that were not seen in the last import and mark them as obsolete
-		if ($oLastFullLoadStartDate == null)
-		{
-			// No previous import known, use the full_load_periodicity value... and the current date
-			$oLastFullLoadStartDate = new DateTime(); // Now
-			$iLoadPeriodicity = $this->Get('full_load_periodicity'); // Duration in seconds
-			if ($iLoadPeriodicity > 0)
-			{
-				$sInterval = "-$iLoadPeriodicity seconds";
-				$oLastFullLoadStartDate->Modify($sInterval);
-			}
-			else
-			{
-				$oLastFullLoadStartDate = new DateTime('1970-01-01');
-			}
-		}
-		$sLimitDate = $oLastFullLoadStartDate->Format('Y-m-d H:i:s');	
-		$oStatLog->AddTrace("Limit Date: $sLimitDate");
-
-		$sDeletePolicy = $this->Get('delete_policy');
-		
-		if ($sDeletePolicy != 'ignore')
-		{
-			$sSelectToObsolete  = "SELECT SynchroReplica WHERE sync_source_id = :source_id AND status IN ('new', 'synchronized', 'modified', 'orphan') AND status_last_seen < :last_import";
-			$oSetToObsolete = new DBObjectSet(DBObjectSearch::FromOQL($sSelectToObsolete), array() /* order by*/, array('source_id' => $this->GetKey(), 'last_import' => $sLimitDate));
-			if (($iCountAllReplicas > 10) && ($iCountAllReplicas == $oSetToObsolete->Count()))
-			{
-				throw new SynchroExceptionNotStarted(Dict::S('Core:SyncTooManyMissingReplicas'));
-			} 
-			while($oReplica = $oSetToObsolete->Fetch())
-			{
-				switch ($sDeletePolicy)
-				{
-				case 'update':
-				case 'update_then_delete':
-					$oStatLog->AddTrace("Destination object to be updated", $oReplica);
-					$aToUpdate = array();
-					$aToUpdateSpec = explode(';', $this->Get('delete_policy_update')); //ex: 'status:obsolete;description:stopped',
-					foreach($aToUpdateSpec as $sUpdateSpec)
-					{
-						$aUpdateSpec = explode(':', $sUpdateSpec);
-						if (count($aUpdateSpec) == 2)
-						{
-							$sAttCode = $aUpdateSpec[0];
-							$sValue = $aUpdateSpec[1];
-							$aToUpdate[$sAttCode] = $sValue;
-						}
-					}
-					$oReplica->Set('status_last_error', '');
-					if ($oReplica->Get('dest_id') == '')
-					{
-						$oReplica->Set('status', 'obsolete');
-						$oStatLog->Inc('stats_nb_replica_disappeared_no_action');
-					}
-					else
-					{
-						$oReplica->UpdateDestObject($aToUpdate, $oMyChange, $oStatLog);
-						if ($oReplica->Get('status_last_error') == '')
-						{
-							// Change the status of the replica IIF
-							$oReplica->Set('status', 'obsolete');
-						}
-					}
-					$oReplica->DBUpdateTracked($oMyChange);
-					break;
-	
-	         	case 'delete':
-	         	default:
-					$oStatLog->AddTrace("Destination object to be DELETED", $oReplica);
-					$oReplica->DeleteDestObject($oMyChange, $oStatLog);
-				}
-			}
-		} // if ($sDeletePolicy != 'ignore'
-
-		//Count "seen" objects
-		$sSelectSeen  = "SELECT SynchroReplica WHERE sync_source_id = :source_id AND status IN ('new', 'synchronized', 'modified', 'orphan') AND status_last_seen >= :last_import";
-		$oSetSeen = new DBObjectSet(DBObjectSearch::FromOQL($sSelectSeen), array() /* order by*/, array('source_id' => $this->GetKey(), 'last_import' => $sLimitDate));
-		$oStatLog->Set('stats_nb_replica_seen', $oSetSeen->Count());
-		
-		// Get all the replicas that are 'new' or modified or synchronized with a warning
-		//
-		$sSelectToSync  = "SELECT SynchroReplica WHERE (status = 'new' OR status = 'modified' OR (status = 'synchronized' AND status_last_warning != '')) AND sync_source_id = :source_id AND status_last_seen >= :last_import";
-		$oSetToSync = new DBObjectSet(DBObjectSearch::FromOQL($sSelectToSync), array() /* order by*/, array('source_id' => $this->GetKey(), 'last_import' => $sLimitDate) /* aArgs */, $aExtDataSpec, 0 /* limitCount */, 0 /* limitStart */);
-
-		while($oReplica = $oSetToSync->Fetch())
-		{
-			$oReplica->Synchro($this, $aReconciliationKeys, $aAttributes, $oMyChange, $oStatLog);
-			$oReplica->DBUpdateTracked($oMyChange);			
-		}
-		
-		// Get all the replicas that are to be deleted
-		//
-		if ($sDeletePolicy == 'update_then_delete')
-		{
-			$oDeletionDate = $oLastFullLoadStartDate;
-			$iDeleteRetention = $this->Get('delete_policy_retention'); // Duration in seconds
-			if ($iDeleteRetention > 0)
-			{
-				$sInterval = "-$iDeleteRetention seconds";
-				$oDeletionDate->Modify($sInterval);
-			}
-			$sDeletionDate = $oDeletionDate->Format('Y-m-d H:i:s');	
-			$oStatLog->AddTrace("Deletion date: $sDeletionDate");
-			$sSelectToDelete  = "SELECT SynchroReplica WHERE sync_source_id = :source_id AND status IN ('obsolete') AND status_last_seen < :last_import";
-			$oSetToDelete = new DBObjectSet(DBObjectSearch::FromOQL($sSelectToDelete), array() /* order by*/, array('source_id' => $this->GetKey(), 'last_import' => $sDeletionDate));
-			while($oReplica = $oSetToDelete->Fetch())
-			{
-				$oStatLog->AddTrace("Destination object to be DELETED", $oReplica);
-				$oReplica->DeleteDestObject($oMyChange, $oStatLog);
-			}
-		}
-	}
-
-	/**
 	 * Get the list of attributes eligible to the synchronization	 
 	 */
 	public function ListTargetAttributes()
@@ -1527,6 +1206,8 @@ class SynchroLog extends DBObject
 		MetaModel::Init_AddAttribute(new AttributeDateTime("start_date", array("allowed_values"=>null, "sql"=>"start_date", "default_value"=>"", "is_null_allowed"=>true, "depends_on"=>array())));
 		MetaModel::Init_AddAttribute(new AttributeDateTime("end_date", array("allowed_values"=>null, "sql"=>"end_date", "default_value"=>"", "is_null_allowed"=>true, "depends_on"=>array())));
 		MetaModel::Init_AddAttribute(new AttributeEnum("status", array("allowed_values"=>new ValueSetEnum('running,completed,error'), "sql"=>"status", "default_value"=>"running", "is_null_allowed"=>false, "depends_on"=>array())));
+		MetaModel::Init_AddAttribute(new AttributeInteger("status_curr_job", array("allowed_values"=>null, "sql"=>"status_curr_job", "default_value"=>0, "is_null_allowed"=>true, "depends_on"=>array())));
+		MetaModel::Init_AddAttribute(new AttributeInteger("status_curr_pos", array("allowed_values"=>null, "sql"=>"status_curr_pos", "default_value"=>0, "is_null_allowed"=>true, "depends_on"=>array())));
 
 		MetaModel::Init_AddAttribute(new AttributeInteger("stats_nb_replica_seen", array("allowed_values"=>null, "sql"=>"stats_nb_replica_seen", "default_value"=>0, "is_null_allowed"=>false, "depends_on"=>array())));
 		MetaModel::Init_AddAttribute(new AttributeInteger("stats_nb_replica_total", array("allowed_values"=>null, "sql"=>"stats_nb_replica_total", "default_value"=>0, "is_null_allowed"=>false, "depends_on"=>array())));
@@ -1626,14 +1307,26 @@ class SynchroLog extends DBObject
 			return;
 		}
 
+		$sPrevTrace = $this->Get('traces');
+
 		$oAttDef = MetaModel::GetAttributeDef(get_class($this), 'traces');
 		$iMaxSize = $oAttDef->GetMaxSize();
-		$sTrace = implode("\n", $this->m_aTraces);
+		if (strlen($sPrevTrace) > 0)
+		{
+			$sTrace = $sPrevTrace."\n".implode("\n", $this->m_aTraces);
+		}
+		else
+		{
+			$sTrace = implode("\n", $this->m_aTraces);
+		}
 		if (strlen($sTrace) >= $iMaxSize)
 		{
 			$sTrace = substr($sTrace, 0, $iMaxSize - 255)."...\nTruncated (size: ".strlen($sTrace).')';
 		}
 		$this->Set('traces', $sTrace);
+
+		//DBUpdate may be called many times... the operation should not be repeated
+		$this->m_aTraces = array();
 	}
 
 	protected function OnInsert()
@@ -2330,6 +2023,626 @@ class SynchroReplica extends DBObject implements iDisplay
 	}
 }
 
+/**
+ * Context of an ongoing synchronization
+ * Two usages:
+ * 1) Public usage: execute the synchronization
+ *    $oSynchroExec = new SynchroExecution($oDataSource[, $iLastFullLoad]);
+ *    $oSynchroExec->Process($iMaxChunkSize); 
+ *      
+ * 2) Internal usage: continue the synchronization (split into chunks, each performed in a separate process)
+ *    This is implemented in the page priv_sync_chunk.php 
+ *    $oSynchroExec = SynchroExecution::Resume($oDataSource, $iLastFullLoad, $iSynchroLog, $iChange, $iMaxToProcess, $iJob, $iNextInJob);    
+ *    $oSynchroExec->Process() 
+ */	
+class SynchroExecution
+{
+	protected $m_oDataSource = null;
+	protected $m_oLastFullLoadStartDate = null;
+
+	protected $m_oChange = null;
+	protected $m_oStatLog = null;
+
+	// Context computed one for optimization and report inconsistencies ASAP
+	protected $m_aExtDataSpec = array();
+	protected $m_aReconciliationKeys = array();
+	protected $m_aAttributes = array();
+	protected $m_iCountAllReplicas = 0;
+
+	/**
+	 * Constructor
+	 * @param SynchroDataSource $oDataSource Synchronization task
+	 * @param DateTime $oLastFullLoadStartDate Date of the last full load (start date/time), if known
+	 * @return void
+	 */
+	public function __construct($oDataSource, $oLastFullLoadStartDate = null)
+	{
+		$this->m_oDataSource = $oDataSource;
+		$this->m_oLastFullLoadStartDate = $oLastFullLoadStartDate;
+	}
+
+	/**
+	* Create the persistant information records, for the current synchronization
+	* In fact, those records ARE defining what is the "current" synchronization	
+	*/	
+	protected function PrepareLogs()
+	{
+		if (!is_null($this->m_oChange))
+		{
+			return;
+		}
+
+		// Create a change used for logging all the modifications/creations happening during the synchro
+		$this->m_oChange = MetaModel::NewObject("CMDBChange");
+		$this->m_oChange->Set("date", time());
+		$sUserString = CMDBChange::GetCurrentUserName();
+		$this->m_oChange->Set("userinfo", $sUserString.' '.Dict::S('Core:SyncDataExchangeComment'));
+		$iChangeId = $this->m_oChange->DBInsert();
+
+		// Start logging this execution (stats + protection against reentrance)
+		//
+		$this->m_oStatLog = new SynchroLog();
+		$this->m_oStatLog->Set('sync_source_id', $this->m_oDataSource->GetKey());
+		$this->m_oStatLog->Set('start_date', time());
+		$this->m_oStatLog->Set('status', 'running');
+		$this->m_oStatLog->Set('stats_nb_replica_seen', 0);
+		$this->m_oStatLog->Set('stats_nb_replica_total', 0);
+		$this->m_oStatLog->Set('stats_nb_obj_deleted', 0);
+		$this->m_oStatLog->Set('stats_nb_obj_deleted_errors', 0);
+		$this->m_oStatLog->Set('stats_nb_obj_obsoleted', 0);
+		$this->m_oStatLog->Set('stats_nb_obj_obsoleted_errors', 0);
+		$this->m_oStatLog->Set('stats_nb_obj_created', 0);
+		$this->m_oStatLog->Set('stats_nb_obj_created_errors', 0);
+		$this->m_oStatLog->Set('stats_nb_obj_created_warnings', 0);
+		$this->m_oStatLog->Set('stats_nb_obj_updated', 0);
+		$this->m_oStatLog->Set('stats_nb_obj_updated_warnings', 0);
+		$this->m_oStatLog->Set('stats_nb_obj_updated_errors', 0);
+		$this->m_oStatLog->Set('stats_nb_obj_unchanged_warnings', 0);
+		//		$this->m_oStatLog->Set('stats_nb_replica_reconciled', 0);
+		$this->m_oStatLog->Set('stats_nb_replica_reconciled_errors', 0);
+		$this->m_oStatLog->Set('stats_nb_replica_disappeared_no_action', 0);
+		$this->m_oStatLog->Set('stats_nb_obj_new_updated', 0);
+		$this->m_oStatLog->Set('stats_nb_obj_new_updated_warnings', 0);
+		$this->m_oStatLog->Set('stats_nb_obj_new_unchanged',0);
+		$this->m_oStatLog->Set('stats_nb_obj_new_unchanged_warnings',0);
+		
+		$sSelectTotal  = "SELECT SynchroReplica WHERE sync_source_id = :source_id";
+		$oSetTotal = new DBObjectSet(DBObjectSearch::FromOQL($sSelectTotal), array() /* order by*/, array('source_id' => $this->m_oDataSource->GetKey()));
+		$this->m_oStatLog->Set('stats_nb_replica_total', $oSetTotal->Count());
+
+		$this->m_oStatLog->DBInsertTracked($this->m_oChange);
+	}
+
+	/**
+	* Prevent against the reentrance... or allow the current task to do things forbidden by the others !
+	*/	
+	public static $m_oCurrentTask = null;
+	public static function GetCurrentTaskId()
+	{
+		if (is_object(self::$m_oCurrentTask))
+		{
+			return self::$m_oCurrentTask->GetKey();
+		}
+		else
+		{
+			return null;
+		}
+	}
+
+	/**
+	* Prepare structures in memory, to speedup the processing of a given replica
+	*/	
+	public function PrepareProcessing($bFirstPass = true)
+	{
+		if ($this->m_oDataSource->Get('status') == 'obsolete')
+		{
+			throw new SynchroExceptionNotStarted(Dict::S('Core:SyncDataSourceObsolete'));
+		}
+		if (!UserRights::IsAdministrator() && $this->m_oDataSource->Get('user_id') != UserRights::GetUserId())
+		{
+			throw new SynchroExceptionNotStarted(Dict::S('Core:SyncDataSourceAccessRestriction'));
+		}
+
+		// Get the list of SQL columns
+		$sClass = $this->m_oDataSource->GetTargetClass();
+		$aAttCodesExpected = array();
+		$aAttCodesToReconcile = array();
+		$aAttCodesToUpdate = array();
+		$sSelectAtt  = "SELECT SynchroAttribute WHERE sync_source_id = :source_id AND (update = 1 OR reconcile = 1)";
+		$oSetAtt = new DBObjectSet(DBObjectSearch::FromOQL($sSelectAtt), array() /* order by*/, array('source_id' => $this->m_oDataSource->GetKey()) /* aArgs */);
+		while ($oSyncAtt = $oSetAtt->Fetch())
+		{
+			if ($oSyncAtt->Get('update'))
+			{
+				$aAttCodesToUpdate[$oSyncAtt->Get('attcode')] = $oSyncAtt;
+			}
+			if ($oSyncAtt->Get('reconcile'))
+			{
+				$aAttCodesToReconcile[$oSyncAtt->Get('attcode')] = $oSyncAtt;
+			}
+			$aAttCodesExpected[$oSyncAtt->Get('attcode')] = $oSyncAtt;
+		}
+		$aColumns = $this->m_oDataSource->GetSQLColumns(array_keys($aAttCodesExpected));
+		$aExtDataFields = array_keys($aColumns);
+		$aExtDataFields[] = 'primary_key';
+
+		$this->m_aExtDataSpec = array(
+			'table' => $this->m_oDataSource->GetDataTable(),
+			'join_key' => 'id',
+			'fields' => $aExtDataFields
+		);
+
+		// Get the list of attributes, determine reconciliation keys and update targets
+		//
+		if ($this->m_oDataSource->Get('reconciliation_policy') == 'use_attributes')
+		{
+			$this->m_aReconciliationKeys = $aAttCodesToReconcile;
+		}
+		elseif ($this->m_oDataSource->Get('reconciliation_policy') == 'use_primary_key')
+		{
+			// Override the settings made at the attribute level !
+			$this->m_aReconciliationKeys = array("primary_key" => null);
+		}
+
+		if ($bFirstPass)
+		{
+			$this->m_oStatLog->AddTrace("Update of: {".implode(', ', array_keys($aAttCodesToUpdate))."}");
+			$this->m_oStatLog->AddTrace("Reconciliation on: {".implode(', ', array_keys($this->m_aReconciliationKeys))."}");
+		}
+
+		if (count($aAttCodesToUpdate) == 0)
+		{
+			$this->m_oStatLog->AddTrace("No attribute to update");
+			throw new SynchroExceptionNotStarted('There is no attribute to update');
+		}
+		if (count($this->m_aReconciliationKeys) == 0)
+		{
+			$this->m_oStatLog->AddTrace("No attribute for reconciliation");
+			throw new SynchroExceptionNotStarted('No attribute for reconciliation');
+		}
+		
+		$this->m_aAttributes = array();
+		foreach($aAttCodesToUpdate as $sAttCode => $oSyncAtt)
+		{
+			$oAttDef = MetaModel::GetAttributeDef($this->m_oDataSource->GetTargetClass(), $sAttCode);
+			if ($oAttDef->IsWritable())
+			{
+				$this->m_aAttributes[$sAttCode] = $oSyncAtt;
+			}
+		}
+
+		// Count the replicas
+		$sSelectAll  = "SELECT SynchroReplica WHERE sync_source_id = :source_id";
+		$oSetAll = new DBObjectSet(DBObjectSearch::FromOQL($sSelectAll), array() /* order by*/, array('source_id' => $this->m_oDataSource->GetKey()));
+		$this->m_iCountAllReplicas = $oSetAll->Count();
+		$this->m_oStatLog->Set('stats_nb_replica_total', $this->m_iCountAllReplicas);
+
+		// Compute and keep track of the limit date taken into account for obsoleting replicas
+		//
+		if ($this->m_oLastFullLoadStartDate == null)
+		{
+			// No previous import known, use the full_load_periodicity value... and the current date
+			$this->m_oLastFullLoadStartDate = new DateTime(); // Now
+			$iLoadPeriodicity = $this->m_oDataSource->Get('full_load_periodicity'); // Duration in seconds
+			if ($iLoadPeriodicity > 0)
+			{
+				$sInterval = "-$iLoadPeriodicity seconds";
+				$this->m_oLastFullLoadStartDate->Modify($sInterval);
+			}
+			else
+			{
+				$this->m_oLastFullLoadStartDate = new DateTime('1970-01-01');
+			}
+		}
+		if ($bFirstPass)
+		{
+			$this->m_oStatLog->AddTrace("Limit Date: ".$this->m_oLastFullLoadStartDate->Format('Y-m-d H:i:s'));
+		}
+	}
+
+
+	/**
+	 * Perform a synchronization between the data stored in the replicas (&synchro_data_xxx_xx table)
+	 * and the iTop objects. If the lastFullLoadStartDate is NOT specified then the full_load_periodicity
+	 * is used to determine which records are obsolete.
+	 * @return void
+	 */
+	public function Process()
+	{
+		$this->PrepareLogs();
+
+		self::$m_oCurrentTask = $this->m_oDataSource;
+		try
+		{
+			$this->DoSynchronize();
+
+			$this->m_oStatLog->Set('end_date', time());
+			$this->m_oStatLog->Set('status', 'completed');
+			$this->m_oStatLog->DBUpdateTracked($this->m_oChange);
+
+			$iErrors = $this->m_oStatLog->GetErrorCount();
+			if ($iErrors > 0)
+			{
+				$sIssuesOQL = "SELECT SynchroReplica WHERE sync_source_id=".$this->m_oDataSource->GetKey()." AND status_last_error!=''";
+				$sAbsoluteUrl = utils::GetAbsoluteUrlAppRoot();
+				$sIssuesURL = "{$sAbsoluteUrl}synchro/replica.php?operation=oql&datasource=".$this->m_oDataSource->GetKey()."&oql=".urlencode($sIssuesOQL);
+				$sSeeIssues = "<p></p>";
+
+				$sStatistics = "<h1>Statistics</h1>\n";
+				$sStatistics .= "<ul>\n";
+				$sStatistics .= "<li>".$this->m_oStatLog->GetLabel('start_date').": ".$this->m_oStatLog->Get('start_date')."</li>\n";
+				$sStatistics .= "<li>".$this->m_oStatLog->GetLabel('end_date').": ".$this->m_oStatLog->Get('end_date')."</li>\n";
+				$sStatistics .= "<li>".$this->m_oStatLog->GetLabel('stats_nb_replica_seen').": ".$this->m_oStatLog->Get('stats_nb_replica_seen')."</li>\n";
+				$sStatistics .= "<li>".$this->m_oStatLog->GetLabel('stats_nb_replica_total').": ".$this->m_oStatLog->Get('stats_nb_replica_total')."</li>\n";
+				$sStatistics .= "<li>".$this->m_oStatLog->GetLabel('stats_nb_obj_deleted').": ".$this->m_oStatLog->Get('stats_nb_obj_deleted')."</li>\n";
+				$sStatistics .= "<li>".$this->m_oStatLog->GetLabel('stats_nb_obj_deleted_errors').": ".$this->m_oStatLog->Get('stats_nb_obj_deleted_errors')."</li>\n";
+				$sStatistics .= "<li>".$this->m_oStatLog->GetLabel('stats_nb_obj_obsoleted').": ".$this->m_oStatLog->Get('stats_nb_obj_obsoleted')."</li>\n";
+				$sStatistics .= "<li>".$this->m_oStatLog->GetLabel('stats_nb_obj_obsoleted_errors').": ".$this->m_oStatLog->Get('stats_nb_obj_obsoleted_errors')."</li>\n";
+				$sStatistics .= "<li>".$this->m_oStatLog->GetLabel('stats_nb_obj_created').": ".$this->m_oStatLog->Get('stats_nb_obj_created')." (".$this->m_oStatLog->Get('stats_nb_obj_created_warnings')." warnings)"."</li>\n";
+				$sStatistics .= "<li>".$this->m_oStatLog->GetLabel('stats_nb_obj_created_errors').": ".$this->m_oStatLog->Get('stats_nb_obj_created_errors')."</li>\n";
+				$sStatistics .= "<li>".$this->m_oStatLog->GetLabel('stats_nb_obj_updated').": ".$this->m_oStatLog->Get('stats_nb_obj_updated')." (".$this->m_oStatLog->Get('stats_nb_obj_updated_warnings')." warnings)"."</li>\n";
+				$sStatistics .= "<li>".$this->m_oStatLog->GetLabel('stats_nb_obj_updated_errors').": ".$this->m_oStatLog->Get('stats_nb_obj_updated_errors')."</li>\n";
+				$sStatistics .= "<li>".$this->m_oStatLog->GetLabel('stats_nb_replica_reconciled_errors').": ".$this->m_oStatLog->Get('stats_nb_replica_reconciled_errors')."</li>\n";
+				$sStatistics .= "<li>".$this->m_oStatLog->GetLabel('stats_nb_replica_disappeared_no_action').": ".$this->m_oStatLog->Get('stats_nb_replica_disappeared_no_action')."</li>\n";
+				$sStatistics .= "<li>".$this->m_oStatLog->GetLabel('stats_nb_obj_new_updated').": ".$this->m_oStatLog->Get('stats_nb_obj_new_updated')." (".$this->m_oStatLog->Get('stats_nb_obj_new_updated_warnings')." warnings)"."</li>\n";
+				$sStatistics .= "<li>".$this->m_oStatLog->GetLabel('stats_nb_obj_new_unchanged').": ".$this->m_oStatLog->Get('stats_nb_obj_new_unchanged')." (".$this->m_oStatLog->Get('stats_nb_obj_new_unchanged_warnings')." warnings)"."</li>\n";
+				$sStatistics .= "</ul>\n";
+
+				$this->m_oDataSource->SendNotification("errors ($iErrors)", "<p>The synchronization has been executed, $iErrors errors have been encountered. Click <a href=\"$sIssuesURL\">here</a> to see the records being currently in error.</p>".$sStatistics);
+			}
+			else
+			{
+				//$this->m_oDataSource->SendNotification('success', '<p>The synchronization has been successfully executed.</p>');
+			}
+		}
+		catch (SynchroExceptionNotStarted $e)
+		{
+			// Set information for reporting... but delete the object in DB
+			$this->m_oStatLog->Set('end_date', time());
+			$this->m_oStatLog->Set('status', 'error');
+			$this->m_oStatLog->Set('last_error', $e->getMessage());
+			$this->m_oStatLog->DBDeleteTracked($this->m_oChange);
+			$this->m_oDataSource->SendNotification('fatal error', '<p>The synchronization could not start: \''.$e->getMessage().'\'</p><p>Please check its configuration</p>');
+		}
+		catch (Exception $e)
+		{
+			$this->m_oStatLog->Set('end_date', time());
+			$this->m_oStatLog->Set('status', 'error');
+			$this->m_oStatLog->Set('last_error', $e->getMessage());
+			$this->m_oStatLog->DBUpdateTracked($this->m_oChange);
+			$this->m_oDataSource->SendNotification('exception', '<p>The synchronization has been interrupted: \''.$e->getMessage().'\'</p><p>Please contact the application support team</p>');
+		}
+		self::$m_oCurrentTask = null;
+
+		return $this->m_oStatLog;
+	}
+
+	/**
+	 * Do the entire synchronization job
+	 */
+	protected function DoSynchronize()
+	{
+		$this->m_oStatLog->Set('status_curr_job', 1);
+		$this->m_oStatLog->Set('status_curr_pos', -1);
+
+		$iMaxChunkSize = utils::ReadParam('max_chunk_size', 0, true /* allow CLI */);
+		if ($iMaxChunkSize > 0)
+		{
+			// Split the execution into several processes
+			// Each process will call DoSynchronizeChunk()
+			// The loop will end when a process does not reply "continue" on the last line of its output
+			if (!utils::IsModeCLI())
+			{
+				throw new SynchroExceptionNotStarted(Dict::S('Core:SyncSplitModeCLIOnly'));
+			}
+			$aArguments = array();
+			$aArguments['source'] = $this->m_oDataSource->GetKey();
+			$aArguments['log'] = $this->m_oStatLog->GetKey();
+			$aArguments['change'] = $this->m_oChange->GetKey();
+			$aArguments['chunk'] = $iMaxChunkSize;
+			if ($this->m_oLastFullLoadStartDate)
+			{
+				$aArguments['last_full_load'] = $this->m_oLastFullLoadStartDate->Format('Y-m-d H:i:s');
+			}
+			else
+			{
+				$aArguments['last_full_load'] = '';
+			}
+
+			$this->m_oStatLog->DBUpdate($this->m_oChange);
+
+			$iStepCount = 0;
+			do
+			{
+				$aArguments['step_count'] = $iStepCount;
+				$iStepCount++;
+
+				list ($iRes, $aOut) = utils::ExecITopScript('synchro/priv_sync_chunk.php', $aArguments);
+	
+				$sLastRes = strtolower(trim(end($aOut)));
+				$bContinue = ($sLastRes == 'continue');
+			}
+			while ($bContinue);
+			
+			// Reload the log that has been modified by the processes
+			$this->m_oStatLog->Reload();
+		}
+		else
+		{
+			$this->PrepareProcessing(/* first pass */);
+			$this->DoJob1();
+			$this->DoJob2();
+			$this->DoJob3();
+		}
+	}
+
+	/**
+	 * Do the synchronization job, limited to some amount of work
+	 * This verb has been designed to be called from within a separate process	 
+	 * @return true if the process has to be continued
+	 */
+	public function DoSynchronizeChunk($oLog, $oChange, $iMaxChunkSize)
+	{
+		// Initialize the structures...
+		self::$m_oCurrentTask = $this->m_oDataSource;
+		$this->m_oStatLog = $oLog;
+		$this->m_oChange = $oChange;
+
+		// Prepare internal structures (not the first pass)
+		$this->PrepareProcessing(false);
+
+		$iCurrJob = $this->m_oStatLog->Get('status_curr_job');
+		$iCurrPos = $this->m_oStatLog->Get('status_curr_pos');
+
+		$this->m_oStatLog->AddTrace("Synchronizing chunk - curr_job:$iCurrJob, curr_pos:$iCurrPos, max_chunk_size:$iMaxChunkSize");
+
+		$bContinue = false;
+		switch ($iCurrJob)
+		{
+			case 1:
+			default:
+				$this->DoJob1($iMaxChunkSize, $iCurrPos);
+				$bContinue = true;
+				break;
+
+			case 2:
+				$this->DoJob2($iMaxChunkSize, $iCurrPos);
+				$bContinue = true;
+				break;
+
+			case 3:
+				$bContinue = $this->DoJob3($iMaxChunkSize, $iCurrPos);
+				break;
+		}
+		$this->m_oStatLog->DBUpdate($this->m_oChange);
+		self::$m_oCurrentTask = null;
+		return $bContinue;
+	}
+
+	/**
+	 * Do the synchronization job #1: Obsolete replica "untouched" for some time
+	 * @param integer $iMaxReplica Limit the number of replicas to process 
+	 * @param integer $iCurrPos Current position where to resume the processing 
+	 * @return true if the process must be continued
+	 */
+	protected function DoJob1($iMaxReplica = null, $iCurrPos = -1)
+	{
+		$sLimitDate = $this->m_oLastFullLoadStartDate->Format('Y-m-d H:i:s');
+
+		// Get all the replicas that were not seen in the last import and mark them as obsolete
+		$sDeletePolicy = $this->m_oDataSource->Get('delete_policy');
+		if ($sDeletePolicy != 'ignore')
+		{
+			$sSelectToObsolete  = "SELECT SynchroReplica WHERE id > :curr_pos AND sync_source_id = :source_id AND status IN ('new', 'synchronized', 'modified', 'orphan') AND status_last_seen < :last_import";
+			$oSetScope = new DBObjectSet(DBObjectSearch::FromOQL($sSelectToObsolete), array() /* order by*/, array('source_id' => $this->m_oDataSource->GetKey(), 'last_import' => $sLimitDate, 'curr_pos' => $iCurrPos));
+			$iCountScope = $oSetScope->Count();
+			if (($this->m_iCountAllReplicas > 10) && ($this->m_iCountAllReplicas == $iCountScope))
+			{
+				throw new SynchroExceptionNotStarted(Dict::S('Core:SyncTooManyMissingReplicas'));
+			} 
+
+			if ($iMaxReplica)
+			{
+				// Re-build the object set and set a LIMIT
+				$oSetToProcess = new DBObjectSet(DBObjectSearch::FromOQL($sSelectToObsolete), array() /* order by*/, array('source_id' => $this->m_oDataSource->GetKey(), 'last_import' => $sLimitDate, 'curr_pos' => $iCurrPos));
+				$oSetToProcess->SetLimit($iMaxReplica);
+			}
+			else
+			{
+				$oSetToProcess = $oSetScope;
+			}
+
+			$iLastReplicaProcessed = -1;
+			while($oReplica = $oSetToProcess->Fetch())
+			{
+				$iLastReplicaProcessed = $oReplica->GetKey();
+				switch ($sDeletePolicy)
+				{
+				case 'update':
+				case 'update_then_delete':
+					$this->m_oStatLog->AddTrace("Destination object to be updated", $oReplica);
+					$aToUpdate = array();
+					$aToUpdateSpec = explode(';', $this->m_oDataSource->Get('delete_policy_update')); //ex: 'status:obsolete;description:stopped',
+					foreach($aToUpdateSpec as $sUpdateSpec)
+					{
+						$aUpdateSpec = explode(':', $sUpdateSpec);
+						if (count($aUpdateSpec) == 2)
+						{
+							$sAttCode = $aUpdateSpec[0];
+							$sValue = $aUpdateSpec[1];
+							$aToUpdate[$sAttCode] = $sValue;
+						}
+					}
+					$oReplica->Set('status_last_error', '');
+					if ($oReplica->Get('dest_id') == '')
+					{
+						$oReplica->Set('status', 'obsolete');
+						$this->m_oStatLog->Inc('stats_nb_replica_disappeared_no_action');
+					}
+					else
+					{
+						$oReplica->UpdateDestObject($aToUpdate, $this->m_oChange, $this->m_oStatLog);
+						if ($oReplica->Get('status_last_error') == '')
+						{
+							// Change the status of the replica IIF
+							$oReplica->Set('status', 'obsolete');
+						}
+					}
+					$oReplica->DBUpdateTracked($this->m_oChange);
+					break;
+	
+	         	case 'delete':
+	         	default:
+					$this->m_oStatLog->AddTrace("Destination object to be DELETED", $oReplica);
+					$oReplica->DeleteDestObject($this->m_oChange, $this->m_oStatLog);
+				}
+			}
+			if ($iMaxReplica)
+			{
+				if ($iMaxReplica < $iCountScope)
+				{
+					// Continue with this job!
+					$this->m_oStatLog->Set('status_curr_pos', $iLastReplicaProcessed);
+					return true;
+				}
+			}
+		} // if ($sDeletePolicy != 'ignore'
+
+		//Count "seen" objects
+		$sSelectSeen  = "SELECT SynchroReplica WHERE sync_source_id = :source_id AND status IN ('new', 'synchronized', 'modified', 'orphan') AND status_last_seen >= :last_import";
+		$oSetSeen = new DBObjectSet(DBObjectSearch::FromOQL($sSelectSeen), array() /* order by*/, array('source_id' => $this->m_oDataSource->GetKey(), 'last_import' => $sLimitDate));
+		$this->m_oStatLog->Set('stats_nb_replica_seen', $oSetSeen->Count());
+
+
+		// Job complete!
+		$this->m_oStatLog->Set('status_curr_job', 2);
+		$this->m_oStatLog->Set('status_curr_pos', -1);
+		return false;
+	}
+
+	/**
+	 * Do the synchronization job #2: Create and modify object for new/modified replicas
+	 * @param integer $iMaxReplica Limit the number of replicas to process 
+	 * @param integer $iCurrPos Current position where to resume the processing 
+	 * @return true if the process must be continued
+	 */
+	protected function DoJob2($iMaxReplica = null, $iCurrPos = -1)
+	{
+		$sLimitDate = $this->m_oLastFullLoadStartDate->Format('Y-m-d H:i:s');
+
+		// Get all the replicas that are 'new' or modified or synchronized with a warning
+		//
+		$sSelectToSync  = "SELECT SynchroReplica WHERE id > :curr_pos AND (status = 'new' OR status = 'modified' OR (status = 'synchronized' AND status_last_warning != '')) AND sync_source_id = :source_id AND status_last_seen >= :last_import";
+		$oSetScope = new DBObjectSet(DBObjectSearch::FromOQL($sSelectToSync), array() /* order by*/, array('source_id' => $this->m_oDataSource->GetKey(), 'last_import' => $sLimitDate, 'curr_pos' => $iCurrPos), $this->m_aExtDataSpec);
+		$iCountScope = $oSetScope->Count();
+
+		if ($iMaxReplica)
+		{
+			// Re-build the object set and set a LIMIT
+			$oSetToProcess = new DBObjectSet(DBObjectSearch::FromOQL($sSelectToSync), array() /* order by*/, array('source_id' => $this->m_oDataSource->GetKey(), 'last_import' => $sLimitDate, 'curr_pos' => $iCurrPos), $this->m_aExtDataSpec);
+			$oSetToProcess->SetLimit($iMaxReplica);
+		}
+		else
+		{
+			$oSetToProcess = $oSetScope;
+		}
+
+		$iLastReplicaProcessed = -1;
+		while($oReplica = $oSetToProcess->Fetch())
+		{
+			$iLastReplicaProcessed = $oReplica->GetKey();
+			$oReplica->Synchro($this->m_oDataSource, $this->m_aReconciliationKeys, $this->m_aAttributes, $this->m_oChange, $this->m_oStatLog);
+			$oReplica->DBUpdateTracked($this->m_oChange);			
+		}
+		
+		if ($iMaxReplica)
+		{
+			if ($iMaxReplica < $iCountScope)
+			{
+				// Continue with this job!
+				$this->m_oStatLog->Set('status_curr_pos', $iLastReplicaProcessed);
+				return true;
+			}
+		}
+
+		// Job complete!
+		$this->m_oStatLog->Set('status_curr_job', 3);
+		$this->m_oStatLog->Set('status_curr_pos', -1);
+		return false;
+	}
+
+	/**
+	 * Do the synchronization job #3: Delete replica depending on the obsolescence scheme
+	 * @param integer $iMaxReplica Limit the number of replicas to process 
+	 * @param integer $iCurrPos Current position where to resume the processing 
+	 * @return true if the process must be continued
+	 */
+	protected function DoJob3($iMaxReplica = null, $iCurrPos = -1)
+	{
+		$sDeletePolicy = $this->m_oDataSource->Get('delete_policy');
+		if ($sDeletePolicy != 'update_then_delete')
+		{
+			// Job complete!
+			$this->m_oStatLog->Set('status_curr_job', 0);
+			$this->m_oStatLog->Set('status_curr_pos', -1);
+			return false;
+		}
+
+		$bFirstPass = ($iCurrPos == -1);
+
+		// Get all the replicas that are to be deleted
+		//
+		$oDeletionDate = $this->m_oLastFullLoadStartDate;
+		$iDeleteRetention = $this->m_oDataSource->Get('delete_policy_retention'); // Duration in seconds
+		if ($iDeleteRetention > 0)
+		{
+			$sInterval = "-$iDeleteRetention seconds";
+			$oDeletionDate->Modify($sInterval);
+		}
+		$sDeletionDate = $oDeletionDate->Format('Y-m-d H:i:s');	
+		if ($bFirstPass)
+		{
+			$this->m_oStatLog->AddTrace("Deletion date: $sDeletionDate");
+		}
+		$sSelectToDelete  = "SELECT SynchroReplica WHERE id > :curr_pos AND sync_source_id = :source_id AND status IN ('obsolete') AND status_last_seen < :last_import";
+		$oSetScope = new DBObjectSet(DBObjectSearch::FromOQL($sSelectToDelete), array() /* order by*/, array('source_id' => $this->m_oDataSource->GetKey(), 'last_import' => $sDeletionDate, 'curr_pos' => $iCurrPos));
+		$iCountScope = $oSetScope->Count();
+
+		if ($iMaxReplica)
+		{
+			// Re-build the object set and set a LIMIT
+			$oSetToProcess = new DBObjectSet(DBObjectSearch::FromOQL($sSelectToDelete), array() /* order by*/, array('source_id' => $this->m_oDataSource->GetKey(), 'last_import' => $sDeletionDate, 'curr_pos' => $iCurrPos));
+			$oSetToProcess->SetLimit($iMaxReplica);
+		}
+		else
+		{
+			$oSetToProcess = $oSetScope;
+		}
+
+		$iLastReplicaProcessed = -1;
+		while($oReplica = $oSetToProcess->Fetch())
+		{
+			$iLastReplicaProcessed = $oReplica->GetKey();
+			$this->m_oStatLog->AddTrace("Destination object to be DELETED", $oReplica);
+			$oReplica->DeleteDestObject($this->m_oChange, $this->m_oStatLog);
+		}
+
+		if ($iMaxReplica)
+		{
+			if ($iMaxReplica < $iCountScope)
+			{
+				// Continue with this job!
+				$this->m_oStatLog->Set('status_curr_pos', $iLastReplicaProcessed);
+				return true;
+			}
+		}
+		// Job complete!
+		$this->m_oStatLog->Set('status_curr_job', 0);
+		$this->m_oStatLog->Set('status_curr_pos', -1);
+		return false;
+	}
+}
+
 	$oAdminMenu = new MenuGroup('AdminTools', 80 /* fRank */, 'SynchroDataSource', UR_ACTION_MODIFY, UR_ALLOWED_YES);
 	new OQLMenuNode('DataSources', 'SELECT SynchroDataSource', $oAdminMenu->GetIndex(), 12 /* fRank */, true, 'SynchroDataSource', UR_ACTION_MODIFY, UR_ALLOWED_YES);
 //	new OQLMenuNode('Replicas', 'SELECT SynchroReplica', $oAdminMenu->GetIndex(), 12 /* fRank */, true, 'SynchroReplica', UR_ACTION_MODIFY, UR_ALLOWED_YES);

+ 4 - 4
test/testlist.inc.php

@@ -2271,8 +2271,8 @@ class TestDataExchange extends TestBizModel
 					),
 				),
 			),
-		//);
-		//$aXXXXScenarios = array(
+//		);
+//		$aXXXXScenarios = array(
 			array(
 				'desc' => 'Update then delete with retention (to complete with manual testing) and reconciliation on org/name',
 				'login' => 'admin',
@@ -2361,7 +2361,7 @@ class TestDataExchange extends TestBizModel
 					),
 					array(
 						array('obj_C', 2, 'obj_C', 'production'),
-				),
+					),
 				),
 				'target_data' => array(
 					array('org_id', 'name', 'status'),
@@ -2403,7 +2403,7 @@ class TestDataExchange extends TestBizModel
 						array(2, 'obj_B', 'implementation'),
 						array(2, 'obj_C', 'production'),
 						array(2, 'obj_D', 'obsolete'),
-				),
+					),
 				),
 				'attributes' => array(
 					'org_id' => array(