modules/core/client-api/src/main/java/org/rhq/core/clientapi/server/discovery/DiscoveryServerService.java | 17 -
modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/InventoryManager.java | 149 +++++++---
modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/discovery/DiscoveryServerServiceImpl.java | 32 +-
3 files changed, 161 insertions(+), 37 deletions(-)
New commits:
commit 4d3f721141b42c1e1ac6eed3a93124f51fbae3e6
Author: Jay Shaughnessy <jshaughn(a)jshaughn.csb>
Date: Thu Jan 31 15:43:28 2013 -0500
Try to batch unknown resource merge in the inventory sync to
help with large inventories and an agent start --purgeData
- also fix issues with server service annotations
diff --git a/modules/core/client-api/src/main/java/org/rhq/core/clientapi/server/discovery/DiscoveryServerService.java b/modules/core/client-api/src/main/java/org/rhq/core/clientapi/server/discovery/DiscoveryServerService.java
index 6b2fba6..04915ae 100644
--- a/modules/core/client-api/src/main/java/org/rhq/core/clientapi/server/discovery/DiscoveryServerService.java
+++ b/modules/core/client-api/src/main/java/org/rhq/core/clientapi/server/discovery/DiscoveryServerService.java
@@ -22,6 +22,7 @@
*/
package org.rhq.core.clientapi.server.discovery;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -63,8 +64,8 @@ public interface DiscoveryServerService {
@LimitedConcurrency(CONCURRENCY_LIMIT_INVENTORY_REPORT)
@Timeout(0L)
// should be something like 1000L * 60 * 30 but until we can be assured we never take longer, disable timeout
- ResourceSyncInfo mergeInventoryReport(InventoryReport inventoryReport)
- throws InvalidInventoryReportException, StaleTypeException;
+ ResourceSyncInfo mergeInventoryReport(InventoryReport inventoryReport) throws InvalidInventoryReportException,
+ StaleTypeException;
/**
* Merges a new availability report from the agent into the server. This updates the availability statuses of known
@@ -90,9 +91,19 @@ public interface DiscoveryServerService {
* @param includeDescendants
* @return a tree of resources with the latest data
*/
+ @LimitedConcurrency(CONCURRENCY_LIMIT_INVENTORY_SYNC)
Set<Resource> getResources(Set<Integer> resourceIds, boolean includeDescendants);
/**
+ * Returns the Resources with the given id's. The children are not set.
+ *
+ * @param resourceIds
+ * @return a list of resources in the same order as the passed in ids, with the latest data
+ */
+ @LimitedConcurrency(CONCURRENCY_LIMIT_INVENTORY_SYNC)
+ List<Resource> getResourcesAsList(List<Integer> resourceIds);
+
+ /**
* Set the specified resource enabled or disabled. The call has no effect if the resource is already
* in the desired state.
*
@@ -158,7 +169,7 @@ public interface DiscoveryServerService {
* @return details on what resources have been upgraded with what data.
*/
Set<ResourceUpgradeResponse> upgradeResources(Set<ResourceUpgradeRequest> upgradeRequests);
-
+
/**
* Gives the server a chance to apply any necessary post-processing that's needed for newly committed resources
* that have been successfully synchronized on the agent.
diff --git a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/InventoryManager.java b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/InventoryManager.java
index eaca661..780eaf8 100644
--- a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/InventoryManager.java
+++ b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/InventoryManager.java
@@ -120,6 +120,7 @@ import org.rhq.core.pluginapi.upgrade.ResourceUpgradeContext;
import org.rhq.core.pluginapi.upgrade.ResourceUpgradeFacet;
import org.rhq.core.system.SystemInfo;
import org.rhq.core.system.SystemInfoFactory;
+import org.rhq.core.util.StopWatch;
import org.rhq.core.util.exception.ThrowableUtil;
import org.rhq.core.util.exception.WrappedRemotingException;
@@ -851,8 +852,7 @@ public class InventoryManager extends AgentService implements ContainerService,
// it will be accessible and editable by the user. Report the start exception at the end.
handleInvalidPluginConfigurationResourceError(resource, t);
throw new PluginContainerException("The resource [" + resource
- + "] has been added but could not be started. Verify the supplied configuration values: ",
- t);
+ + "] has been added but could not be started. Verify the supplied configuration values: ", t);
}
}
@@ -1108,7 +1108,7 @@ public class InventoryManager extends AgentService implements ContainerService,
log.info("Syncing local inventory with Server inventory...");
long startTime = System.currentTimeMillis();
Set<Resource> syncedResources = new LinkedHashSet<Resource>();
- Set<Integer> unknownResourceIds = new LinkedHashSet<Integer>();
+ Set<ResourceSyncInfo> unknownResourceSyncInfos = new LinkedHashSet<ResourceSyncInfo>();
Set<Integer> modifiedResourceIds = new LinkedHashSet<Integer>();
Set<Integer> deletedResourceIds = new LinkedHashSet<Integer>();
Set<Resource> newlyCommittedResources = new LinkedHashSet<Resource>();
@@ -1124,16 +1124,16 @@ public class InventoryManager extends AgentService implements ContainerService,
}
log.debug("Processing Server sync info...");
- processSyncInfo(syncInfo, syncedResources, unknownResourceIds, modifiedResourceIds, deletedResourceIds,
- newlyCommittedResources);
+ processSyncInfo(syncInfo, syncedResources, unknownResourceSyncInfos, modifiedResourceIds,
+ deletedResourceIds, newlyCommittedResources);
if (log.isDebugEnabled()) {
log.debug(String.format("DONE Processing sync info - took [%d] ms - synced [%d] Resources "
+ "- found [%d] unknown Resources and [%d] modified Resources.",
- (System.currentTimeMillis() - startTime), syncedResources.size(), unknownResourceIds.size(),
+ (System.currentTimeMillis() - startTime), syncedResources.size(), unknownResourceSyncInfos.size(),
modifiedResourceIds.size()));
}
- mergeUnknownResources(unknownResourceIds);
+ mergeUnknownResources(unknownResourceSyncInfos);
mergeModifiedResources(modifiedResourceIds);
if (!partialInventory) {
purgeObsoleteResources(allServerSideUuids);
@@ -1157,7 +1157,7 @@ public class InventoryManager extends AgentService implements ContainerService,
// the upgrade phase. Not to mention the fact that no thread pools are initialized yet by the
// time the upgrade kicks in..
if (!isResourceUpgradeActive()
- && (!syncedResources.isEmpty() || !unknownResourceIds.isEmpty() || !modifiedResourceIds.isEmpty())) {
+ && (!syncedResources.isEmpty() || !unknownResourceSyncInfos.isEmpty() || !modifiedResourceIds.isEmpty())) {
performAvailabilityChecks(true);
this.inventoryThreadPoolExecutor.schedule((Callable<? extends Object>) this.serviceScanExecutor,
configuration.getChildResourceDiscoveryDelay(), TimeUnit.SECONDS);
@@ -1610,8 +1610,9 @@ public class InventoryManager extends AgentService implements ContainerService,
* @throws PluginContainerException
* @return true the resource has been successfully prepared and can be started. False if the resource should not be started.
*/
- private boolean prepareResourceForActivation(Resource resource, @NotNull ResourceContainer container,
- boolean forceReinitialization) throws InvalidPluginConfigurationException, PluginContainerException {
+ private boolean prepareResourceForActivation(Resource resource, @NotNull
+ ResourceContainer container, boolean forceReinitialization) throws InvalidPluginConfigurationException,
+ PluginContainerException {
if (resourceUpgradeDelegate.hasUpgradeFailed(resource)) {
if (log.isTraceEnabled()) {
@@ -1753,8 +1754,9 @@ public class InventoryManager extends AgentService implements ContainerService,
* plugin configuration
* @throws PluginContainerException for all other errors
*/
- public void activateResource(Resource resource, @NotNull ResourceContainer container, boolean updatedPluginConfig)
- throws InvalidPluginConfigurationException, PluginContainerException {
+ public void activateResource(Resource resource, @NotNull
+ ResourceContainer container, boolean updatedPluginConfig) throws InvalidPluginConfigurationException,
+ PluginContainerException {
if (resourceUpgradeDelegate.hasUpgradeFailed(resource)) {
if (log.isTraceEnabled()) {
@@ -1831,8 +1833,7 @@ public class InventoryManager extends AgentService implements ContainerService,
getOperationContext(resource), // for operation manager access
getContentContext(resource), // for content manager access
getAvailabilityContext(resource, this.availabilityCollectors), // for components that want to perform async avail checking
- getInventoryContext(resource),
- this.configuration.getPluginContainerDeployment()); // helps components make determinations of what to do
+ getInventoryContext(resource), this.configuration.getPluginContainerDeployment()); // helps components make determinations of what to do
}
public <T extends ResourceComponent<?>> ResourceUpgradeContext<T> createResourceUpgradeContext(Resource resource,
@@ -1851,8 +1852,7 @@ public class InventoryManager extends AgentService implements ContainerService,
getOperationContext(resource), // for operation manager access
getContentContext(resource), // for content manager access
getAvailabilityContext(resource, this.availabilityCollectors), // for components that want avail manager access
- getInventoryContext(resource),
- this.configuration.getPluginContainerDeployment()); // helps components make determinations of what to do
+ getInventoryContext(resource), this.configuration.getPluginContainerDeployment()); // helps components make determinations of what to do
}
/**
@@ -2753,8 +2753,8 @@ public class InventoryManager extends AgentService implements ContainerService,
}
private void processSyncInfo(ResourceSyncInfo syncInfo, Set<Resource> syncedResources,
- Set<Integer> unknownResourceIds, Set<Integer> modifiedResourceIds, Set<Integer> deletedResourceIds,
- Set<Resource> newlyCommittedResources) {
+ Set<ResourceSyncInfo> unknownResourceSyncInfos, Set<Integer> modifiedResourceIds,
+ Set<Integer> deletedResourceIds, Set<Resource> newlyCommittedResources) {
if (InventoryStatus.DELETED == syncInfo.getInventoryStatus()) {
// A previously deleted resource still being reported by the server. Support for this option can
// be removed if the server is ever modified to not report deleted resources. It is happening currently
@@ -2765,7 +2765,7 @@ public class InventoryManager extends AgentService implements ContainerService,
ResourceContainer container = this.resourceContainers.get(syncInfo.getUuid());
if (container == null) {
// Either a manually added Resource or just something we haven't discovered.
- unknownResourceIds.add(syncInfo.getId());
+ unknownResourceSyncInfos.add(syncInfo);
log.info("Got unknown resource: " + syncInfo.getId());
} else {
Resource resource = container.getResource();
@@ -2811,7 +2811,7 @@ public class InventoryManager extends AgentService implements ContainerService,
// Recurse...
for (ResourceSyncInfo childSyncInfo : syncInfo.getChildSyncInfos()) {
- processSyncInfo(childSyncInfo, syncedResources, unknownResourceIds, modifiedResourceIds,
+ processSyncInfo(childSyncInfo, syncedResources, unknownResourceSyncInfos, modifiedResourceIds,
deletedResourceIds, newlyCommittedResources);
}
}
@@ -2831,18 +2831,16 @@ public class InventoryManager extends AgentService implements ContainerService,
}
}
- private void mergeUnknownResources(Set<Integer> unknownResourceIds) {
- if (log.isDebugEnabled()) {
- log.debug("Merging [" + unknownResourceIds.size()
- + "] unknown Resources and their descendants into local inventory...");
- }
+ private void mergeUnknownResources(Set<ResourceSyncInfo> unknownResourceSyncInfos) {
+ //TODO if (log.isDebugEnabled()) {
+ log.info("Merging [" + unknownResourceSyncInfos.size()
+ + "] unknown Resources and their descendants into local inventory...");
+ //}
- if (!unknownResourceIds.isEmpty()) {
+ if (!unknownResourceSyncInfos.isEmpty()) {
PluginMetadataManager pmm = this.pluginManager.getMetadataManager();
- Set<Resource> unknownResources = configuration.getServerServices().getDiscoveryServerService()
- .getResources(unknownResourceIds, true);
-
+ Set<Resource> unknownResources = getResourcesFromSyncInfos(unknownResourceSyncInfos);
Set<Integer> toBeIgnored = new HashSet<Integer>();
for (Resource unknownResource : unknownResources) {
@@ -2860,11 +2858,102 @@ public class InventoryManager extends AgentService implements ContainerService,
}
}
- unknownResourceIds.removeAll(toBeIgnored);
+ unknownResourceSyncInfos.removeAll(toBeIgnored);
}
return;
}
+ private Set<Resource> getResourcesFromSyncInfos(Set<ResourceSyncInfo> syncInfos) {
+
+ Set<Resource> result = new HashSet<Resource>(syncInfos.size());
+
+ for (ResourceSyncInfo syncInfo : syncInfos) {
+ Resource resource = getResourceFromSyncInfo(syncInfo);
+ result.add(resource);
+ }
+
+ return result;
+ }
+
+ static final int BATCH_SIZE = 500;
+
+ private Resource getResourceFromSyncInfo(ResourceSyncInfo syncInfo) {
+ Resource result;
+
+ List<Integer> resourceIdList = treeToBreadthFirstList(syncInfo);
+
+ Map<Integer, Resource> resourceMap = new HashMap<Integer, Resource>(resourceIdList.size());
+
+ while (!resourceIdList.isEmpty()) {
+ int size = resourceIdList.size();
+ int end = (BATCH_SIZE < size) ? BATCH_SIZE : size;
+
+ List<Integer> resourceIdBatch = resourceIdList.subList(0, end);
+ List<Resource> resourceBatch = configuration.getServerServices().getDiscoveryServerService()
+ .getResourcesAsList(resourceIdBatch);
+
+ // Advance our progress and possibly help GC. This will remove the processed resources from the backing list
+ resourceIdBatch.clear();
+
+ // add the newly fetched resources to the end of our master list
+ for (Resource r : resourceBatch) {
+ resourceMap.put(r.getId(), r);
+ }
+
+ // Again, help the GC
+ resourceBatch.clear();
+ }
+
+ if (resourceIdList.size() != resourceMap.size()) {
+ log.warn("Expected [" + resourceIdList.size() + "] but found [" + resourceMap.size()
+ + "] resources when fetching from server");
+ }
+
+ StopWatch stopWatch = new StopWatch();
+
+ result = syncInfoTreeToResourceTree(syncInfo, null, resourceMap);
+
+ //TODO if (log.isDebugEnabled()) {
+ log.info("syncInfoTreeToResourceTree time=[" + stopWatch + "]");
+ //}
+
+ return result;
+ }
+
+ private Resource syncInfoTreeToResourceTree(ResourceSyncInfo syncInfo, Resource parentResource,
+ Map<Integer, Resource> resourceMap) {
+ Resource result = resourceMap.get(syncInfo.getId());
+
+ if (null == result || null == syncInfo.getChildSyncInfos()) {
+ return result;
+ }
+
+ for (ResourceSyncInfo child : syncInfo.getChildSyncInfos()) {
+ Resource childResource = syncInfoTreeToResourceTree(child, result, resourceMap);
+ if (null != childResource) {
+ result.addChildResource(childResource);
+ }
+ }
+
+ return result;
+ }
+
+ private List<Integer> treeToBreadthFirstList(ResourceSyncInfo syncInfo) {
+ List<Integer> result = new ArrayList<Integer>();
+
+ LinkedList<ResourceSyncInfo> queue = new LinkedList<ResourceSyncInfo>();
+ queue.add(syncInfo);
+ while (!queue.isEmpty()) {
+ ResourceSyncInfo node = queue.remove();
+ result.add(node.getId());
+ for (ResourceSyncInfo child : node.getChildSyncInfos()) {
+ queue.add(child);
+ }
+ }
+
+ return result;
+ }
+
// private void print(Resource resourceTreeNode, int level) {
// StringBuilder builder = new StringBuilder();
// for (int i = 0; i < level; i++) {
diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/discovery/DiscoveryServerServiceImpl.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/discovery/DiscoveryServerServiceImpl.java
index ebc0cd1..fe03746 100644
--- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/discovery/DiscoveryServerServiceImpl.java
+++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/discovery/DiscoveryServerServiceImpl.java
@@ -18,7 +18,9 @@
*/
package org.rhq.enterprise.server.discovery;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -130,10 +132,12 @@ public class DiscoveryServerServiceImpl implements DiscoveryServerService {
long elapsed = (System.currentTimeMillis() - start);
if (elapsed > 20000L) {
- log.warn("Performance: processed " + reportToString + " - needFull=[" + !ok + "] in (" + elapsed + ")ms");
+ log.warn("Performance: processed " + reportToString + " - needFull=[" + !ok + "] in (" + elapsed
+ + ")ms");
} else {
if (log.isDebugEnabled()) {
- log.debug("Performance: processed " + reportToString + " - needFull=[" + !ok + "] in (" + elapsed + ")ms");
+ log.debug("Performance: processed " + reportToString + " - needFull=[" + !ok + "] in (" + elapsed
+ + ")ms");
}
}
@@ -167,6 +171,26 @@ public class DiscoveryServerServiceImpl implements DiscoveryServerService {
}
@Override
+ public List<Resource> getResourcesAsList(List<Integer> resourceIds) {
+ long start = System.currentTimeMillis();
+ ResourceManagerLocal resourceManager = LookupUtil.getResourceManager();
+ List<Resource> result = new ArrayList<Resource>(resourceIds.size());
+ for (Integer resourceId : resourceIds) {
+ //TODO: This can probably be one call to resource criteria fetch
+ Resource resource = resourceManager.getResourceTree(resourceId, false);
+ if (isVisibleInInventory(resource)) {
+ resource = convertToPojoResource(resource, false);
+ result.add(resource);
+ }
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Performance: get ResourcesAsList [" + resourceIds + "], timing ("
+ + (System.currentTimeMillis() - start) + ")ms");
+ }
+ return result;
+ }
+
+ @Override
public Map<Integer, InventoryStatus> getInventoryStatus(int rootResourceId, boolean descendents) {
long start = System.currentTimeMillis();
ResourceManagerLocal resourceManager = LookupUtil.getResourceManager();
@@ -244,8 +268,8 @@ public class DiscoveryServerServiceImpl implements DiscoveryServerService {
}
private static boolean isVisibleInInventory(Resource resource) {
- return resource.getInventoryStatus() != InventoryStatus.DELETED &&
- resource.getInventoryStatus() != InventoryStatus.UNINVENTORIED;
+ return resource.getInventoryStatus() != InventoryStatus.DELETED
+ && resource.getInventoryStatus() != InventoryStatus.UNINVENTORIED;
}
@Override