modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManagerBean.java | 12
modules/enterprise/server/server-metrics/pom.xml | 3
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AbortedException.java | 23
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java | 37
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java | 92 +
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/SignalingCountDownLatch.java | 34
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java | 127 ++
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java | 84 +
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java | 73 +
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java | 133 ++
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java | 257 +++++
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java | 378 +++++++
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute1HourData.java | 113 ++
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute24HourData.java | 99 +
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute6HourData.java | 106 ++
modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java | 498 ++++++++++
modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/CassandraIntegrationTest.java | 41
modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsDAOTest.java | 26
modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsPerfTests.java | 191 +++
modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsServerTest.java | 38
modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsTest.java | 138 ++
modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/WaitForRawInserts.java | 51 +
modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/WaitForWrite.java | 48
modules/enterprise/server/server-metrics/src/test/resources/log4j.xml | 2
modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/MeasurementAggregator.java | 7
modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/Simulator.java | 41
modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/plan/SimulationPlan.java | 40
modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/plan/SimulationPlanner.java | 3
28 files changed, 2579 insertions(+), 116 deletions(-)
New commits:
commit 3b192452ec06fa4c747a520989b66b28f986d02c
Author: John Sanda <jsanda(a)redhat.com>
Date: Wed Oct 2 17:36:21 2013 -0400
[BZ 1009945] squash commit of jsanda/aggregation branch
commit 1 - Initial commit for new Aggregator class
This is a first pass at computing multiple aggregates concurrently.
Conflicts:
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
commit 2 - update 6 hr index after inserting 1 hr data
commit 3 -initial support for generating 6 hr and 24 hr data
Conflicts:
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java
commit 4 - delete 1 hr index entries when aggregation of time slice data is finished
commit 5 - handle scenarios for when there is no 1 hr and/or 6 hr data
This commit also includes some major test changes. AggregationTests is a more
thorough set (of not yet complete) tests to provide better coverage.
commit 6 - finishing test for 24 hour data
Conflicts:
modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java
commit 7 - adding initial test for failure scenario
This test is for when we fail to fetch the raw data index.
commit 8 - This is a big refactoring of the initial implementation of Aggregator.java
The overall design is the same in terms of the fan out approach for doing the
calculations, but some changes were necessary after an extensive performance
analysis. Initially the aggregation tasks were to granular. Tasks were
submitted to the thread pool to process a single schedule. Schedules are now
processed in batches of 250. I did a lot of performance testing with various
batch sizes, and 250 seems optimal both in terms of execution time as well as
memory consumption. The other major change involves throttling. The initial
implementation had no throttling in place which immediately caused read and/or
write timeouts. Then I put throttling in place using RateLimiters, but there
was separate throttling that is used in MetricsServer for inserting raw data.
The same throttling is now used by both MetricsServer and Aggregator. There
are actually separate rate limiters for reads and for writes. Both are
configurable as well.
Conflicts:
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
commit 9 - adding back support for aggregating 6 hour data
commit 10 - fix logic for determining if aggregation has finished
We check the respective counters to see if raw, 1 hr, and 6 hr data aggregation
has finished. Prior to checking the counters though, we need to make sure that
they have been initialized. They do not get initialized until the respective
index entries arrive. I have refactored the logic into a new method,
isAggregationFinished. It first waits for the arrival of all the index entries
and then checks the counters.
commit 11 - set counters for remaining data when we fail to get index entries for 1 and 6 hr data
commit 12 - refactoring some duplicate code and fixing when tasks are scheduled
Aggregation tasks for 1 and 6 hour data were getting schedule too early. They
cannot get scheduled until both their respective index entries have arrived and
aggregation for the previous buckets has completed.
commit 13 - make connection pool sizes and rate limits configurable
Connection pools per host and rate limits are configurable via system
properties. While the connection pool sizes are only set at start up, I plan to
make the rate limits self-tuning.
This commit also adds the missing method body to kick off 6 hour data
aggregation in Aggregate1HourData.java.
Conflicts:
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
commit 14 - add support for running async aggregation in simulator
commit 15 - eliminate a couple possible race condition when processing index entries
commit 16 - cleaning up logging and adding some javadocs
Also moving all aggregation related classes into the org.rhq.server.metrics.aggregation
package. All classes except Aggregator now have package-level access.
commit 17 - updating test with package refactoring
commit 18 - turn on async aggregation by default
diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManagerBean.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManagerBean.java
index dc8c554..7d390f4 100644
--- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManagerBean.java
+++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManagerBean.java
@@ -41,6 +41,8 @@ import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.ProtocolOptions;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
@@ -355,6 +357,16 @@ public class StorageClientManagerBean {
.withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE)).withCompression(
ProtocolOptions.Compression.NONE).build();
+ PoolingOptions poolingOptions = cluster.getConfiguration().getPoolingOptions();
+ poolingOptions.setCoreConnectionsPerHost(HostDistance.LOCAL, Integer.parseInt(
+ System.getProperty("rhq.storage.client.local-connections", "24")));
+ poolingOptions.setCoreConnectionsPerHost(HostDistance.REMOTE, Integer.parseInt(
+ System.getProperty("rhq.storage.client.remote-connections", "16")));
+ poolingOptions.setMaxConnectionsPerHost(HostDistance.LOCAL, Integer.parseInt(
+ System.getProperty("rhq.storage.client.max-local-connections", "32")));
+ poolingOptions.setMaxConnectionsPerHost(HostDistance.REMOTE, Integer.parseInt(
+ System.getProperty("rhq.storage.client.max-remote-connections", "24")));
+
return cluster.connect(RHQ_KEYSPACE);
}
diff --git a/modules/enterprise/server/server-metrics/pom.xml b/modules/enterprise/server/server-metrics/pom.xml
index 0b4c141..122d7a5 100644
--- a/modules/enterprise/server/server-metrics/pom.xml
+++ b/modules/enterprise/server/server-metrics/pom.xml
@@ -157,6 +157,9 @@
<rhq.storage.cluster.dir>${rhq.storage.cluster.dir}</rhq.storage.cluster.dir>
<rhq.storage.cluster.deploy>${deployCluster}</rhq.storage.cluster.deploy>
</systemPropertyVariables>
+ <excludes>
+ <exclude>**/MetricsPerfTests.java</exclude>
+ </excludes>
</configuration>
</plugin>
</plugins>
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AbortedException.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AbortedException.java
new file mode 100644
index 0000000..cab2ab0
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AbortedException.java
@@ -0,0 +1,23 @@
+package org.rhq.server.metrics;
+
+/**
+ * @author John Sanda
+ */
+public class AbortedException extends Exception {
+
+ public AbortedException() {
+ super();
+ }
+
+ public AbortedException(String message) {
+ super(message);
+ }
+
+ public AbortedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public AbortedException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java
index e62545e..66d0493 100644
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java
@@ -154,17 +154,36 @@ public class MetricsDAO {
return storageSession.execute(statement);
}
+ public StorageResultSetFuture insertOneHourDataAsync(int scheduleId, long timestamp, AggregateType type,
+ double value) {
+ BoundStatement statement = insertOneHourData.bind(scheduleId, new Date(timestamp), type.ordinal(), value);
+ return storageSession.executeAsync(statement);
+ }
+
public ResultSet insertSixHourData(int scheduleId, long timestamp, AggregateType type, double value) {
BoundStatement statement = insertSixHourData.bind(scheduleId, new Date(timestamp), type.ordinal(), value);
return storageSession.execute(statement);
}
+ public StorageResultSetFuture insertSixHourDataAsync(int scheduleId, long timestamp, AggregateType type,
+ double value) {
+ BoundStatement statement = insertSixHourData.bind(scheduleId, new Date(timestamp), type.ordinal(), value);
+ return storageSession.executeAsync(statement);
+ }
+
public ResultSet insertTwentyFourHourData(int scheduleId, long timestamp, AggregateType type, double value) {
BoundStatement statement = insertTwentyFourHourData.bind(scheduleId, new Date(timestamp), type.ordinal(),
value);
return storageSession.execute(statement);
}
+ public StorageResultSetFuture insertTwentyFourHourDataAsync(int scheduleId, long timestamp, AggregateType type,
+ double value) {
+ BoundStatement statement = insertTwentyFourHourData.bind(scheduleId, new Date(timestamp), type.ordinal(),
+ value);
+ return storageSession.executeAsync(statement);
+ }
+
public Iterable<RawNumericMetric> findRawMetrics(int scheduleId, long startTime, long endTime) {
try {
BoundStatement boundStatement = rawMetricsQuery.bind(scheduleId, new Date(startTime), new Date(endTime));
@@ -175,6 +194,11 @@ public class MetricsDAO {
}
}
+ public ResultSet findRawMetricsSync(int scheduleId, long startTime, long endTime) {
+ BoundStatement boundStatement = rawMetricsQuery.bind(scheduleId, new Date(startTime), new Date(endTime));
+ return storageSession.execute(boundStatement);
+ }
+
public StorageResultSetFuture findRawMetricsAsync(int scheduleId, long startTime, long endTime) {
BoundStatement boundStatement = rawMetricsQuery.bind(scheduleId, new Date(startTime), new Date(endTime));
return storageSession.executeAsync(boundStatement);
@@ -212,8 +236,7 @@ public class MetricsDAO {
}
public StorageResultSetFuture findSixHourMetricsAsync(int scheduleId, long startTime, long endTime) {
- BoundStatement statement = findSixHourMetricsByDateRange.bind(scheduleId, new Date(startTime),
- new Date(endTime));
+ BoundStatement statement = findSixHourMetricsByDateRange.bind(scheduleId, new Date(startTime), new Date(endTime));
return storageSession.executeAsync(statement);
}
@@ -260,6 +283,11 @@ public class MetricsDAO {
return new SimplePagedResult<MetricsIndexEntry>(statement, new MetricsIndexEntryMapper(table), storageSession);
}
+ public StorageResultSetFuture findMetricsIndexEntriesAsync(MetricsTable table, long timestamp) {
+ BoundStatement statement = findIndexEntries.bind(table.toString(), new Date(timestamp));
+ return storageSession.executeAsync(statement);
+ }
+
public ResultSet setFindTimeSliceForIndex(MetricsTable table, long timestamp) {
BoundStatement statement = findTimeSliceForIndex.bind(table.toString(), new Date(timestamp));
return storageSession.execute(statement);
@@ -282,4 +310,9 @@ public class MetricsDAO {
BoundStatement statement = deleteIndexEntries.bind(table.getTableName(), new Date(timestamp));
storageSession.execute(statement);
}
+
+ public StorageResultSetFuture deleteMetricsIndexEntriesAsync(MetricsTable table, long timestamp) {
+ BoundStatement statement = deleteIndexEntries.bind(table.getTableName(), new Date(timestamp));
+ return storageSession.executeAsync(statement);
+ }
}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
index 33528e6..d319146 100644
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
@@ -32,7 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-import java.util.concurrent.Semaphore;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -43,6 +43,9 @@ import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -52,6 +55,7 @@ import org.joda.time.Duration;
import org.rhq.core.domain.measurement.MeasurementDataNumeric;
import org.rhq.core.domain.measurement.composite.MeasurementDataNumericHighLowComposite;
+import org.rhq.server.metrics.aggregation.Aggregator;
import org.rhq.server.metrics.domain.AggregateNumericMetric;
import org.rhq.server.metrics.domain.AggregateType;
import org.rhq.server.metrics.domain.MetricsIndexEntry;
@@ -71,7 +75,11 @@ public class MetricsServer {
private MetricsConfiguration configuration;
- private Semaphore semaphore = new Semaphore(100);
+ private RateLimiter readPermits = RateLimiter.create(Integer.parseInt(
+ System.getProperty("rhq.storage.read-limit", "1000")), 3, TimeUnit.MINUTES);
+
+ private RateLimiter writePermits = RateLimiter.create(Integer.parseInt(
+ System.getProperty("rhq.storage.write-limit", "2500")), 3, TimeUnit.MINUTES);
private boolean pastAggregationMissed;
@@ -79,6 +87,13 @@ public class MetricsServer {
private AtomicLong totalAggregationTime = new AtomicLong();
+ private ListeningExecutorService aggregationWorkers = MoreExecutors.listeningDecorator(
+ Executors.newFixedThreadPool(5));
+
+ private int aggregationBatchSize;
+
+ private boolean useAsyncAggregation = Boolean.valueOf(System.getProperty("rhq.metrics.aggregation.async", "true"));
+
public void setDAO(MetricsDAO dao) {
this.dao = dao;
}
@@ -91,7 +106,34 @@ public class MetricsServer {
this.dateTimeService = dateTimeService;
}
+ public void setAggregationBatchSize(int batchSize) {
+ aggregationBatchSize = batchSize;
+ }
+
+ public void setUseAsyncAggregation(boolean useAsyncAggregation) {
+ this.useAsyncAggregation = useAsyncAggregation;
+ }
+
+ public RateLimiter getReadPermits() {
+ return readPermits;
+ }
+
+ public void setReadPermits(RateLimiter readPermits) {
+ this.readPermits = readPermits;
+ }
+
+ public RateLimiter getWritePermits() {
+ return writePermits;
+ }
+
+ public void setWritePermits(RateLimiter writePermits) {
+ this.writePermits = writePermits;
+ }
+
public void init() {
+ if (log.isDebugEnabled() && useAsyncAggregation) {
+ log.debug("Async aggregation is enabled");
+ }
determineMostRecentRawDataSinceLastShutdown();
}
@@ -131,6 +173,11 @@ public class MetricsServer {
}
}
+ private boolean hasTimeSliceEnded(DateTime startTime, Duration duration) {
+ DateTime endTime = startTime.plus(duration);
+ return DateTimeComparator.getInstance().compare(currentHour(), endTime) >= 0;
+ }
+
protected DateTime currentHour() {
return dateTimeService.getTimeSlice(dateTimeService.now(), configuration.getRawTimeSliceDuration());
}
@@ -140,6 +187,7 @@ public class MetricsServer {
}
public void shutdown() {
+ aggregationWorkers.shutdown();
}
public RawNumericMetric findLatestValueForResource(int scheduleId) {
@@ -363,7 +411,7 @@ public class MetricsServer {
final AtomicInteger remainingInserts = new AtomicInteger(dataSet.size());
for (final MeasurementDataNumeric data : dataSet) {
- semaphore.acquire();
+ writePermits.acquire();
StorageResultSetFuture resultSetFuture = dao.insertRawData(data);
Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
@Override
@@ -381,7 +429,6 @@ public class MetricsServer {
throwable.getClass().getName() + ": " + throwable.getMessage());
}
callback.onFailure(throwable);
- semaphore.release();
}
});
}
@@ -396,6 +443,7 @@ public class MetricsServer {
long timeSlice = dateTimeService.getTimeSlice(new DateTime(rawData.getTimestamp()),
configuration.getRawTimeSliceDuration()).getMillis();
+ writePermits.acquire();
StorageResultSetFuture resultSetFuture = dao.updateMetricsIndex(MetricsTable.ONE_HOUR, rawData.getScheduleId(),
timeSlice);
Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
@@ -409,7 +457,6 @@ public class MetricsServer {
}
callback.onFinish();
}
- semaphore.release();
}
@Override
@@ -417,7 +464,6 @@ public class MetricsServer {
log.error("An error occurred while trying to update " + MetricsTable.INDEX + " for raw data " +
rawData);
callback.onFailure(throwable);
- semaphore.release();
}
});
}
@@ -431,14 +477,24 @@ public class MetricsServer {
* for subsequently computing baselines.
*/
public Iterable<AggregateNumericMetric> calculateAggregates() {
- DateTime theHour = currentHour();
+ long start = System.currentTimeMillis();
+ try {
+ DateTime theHour = currentHour();
- if (pastAggregationMissed) {
- calculateAggregates(roundDownToHour(mostRecentRawDataPriorToStartup).plusHours(1).getMillis());
- pastAggregationMissed = false;
- return calculateAggregates(theHour.getMillis());
- } else {
- return calculateAggregates(theHour.getMillis());
+ if (pastAggregationMissed) {
+ theHour = roundDownToHour(mostRecentRawDataPriorToStartup).plusHours(1);
+ pastAggregationMissed = false;
+ }
+
+ if (useAsyncAggregation) {
+ DateTime timeSlice = theHour.minus(configuration.getRawTimeSliceDuration());
+ return new Aggregator(aggregationWorkers, dao, configuration, dateTimeService, timeSlice,
+ aggregationBatchSize, writePermits, readPermits).run();
+ } else {
+ return calculateAggregates(theHour.getMillis());
+ }
+ } finally {
+ log.info("Finished metrics aggregation in " + (System.currentTimeMillis() - start) + " ms");
}
}
@@ -460,16 +516,6 @@ public class MetricsServer {
long twentyFourHourTimeSlice = dateTimeService.getTimeSlice(lastHour,
configuration.getSixHourTimeSliceDuration()).getMillis();
- // We first query the metrics index table to determine which schedules have data to
- // be aggregated. Then we retrieve the metric data and aggregate or compress the
- // data, writing the compressed values into the next wider (i.e., longer life span
- // for data) bucket/table. At this point we remove the index entries for the data
- // that has already been processed. We purge the entire row in the index table.
- // We can safely do this because the row wi..
- //
- // The last step in the work flow is to update the metrics
- // index for the newly persisted aggregates.
-
List<AggregateNumericMetric> newOneHourAggregates = null;
Stopwatch stopwatch = new Stopwatch().start();
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/SignalingCountDownLatch.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/SignalingCountDownLatch.java
new file mode 100644
index 0000000..2d0d190
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/SignalingCountDownLatch.java
@@ -0,0 +1,34 @@
+package org.rhq.server.metrics;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * @author John Sanda
+ */
+public class SignalingCountDownLatch {
+
+ private boolean aborted;
+
+ private CountDownLatch latch;
+
+ public SignalingCountDownLatch(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ public void await() throws InterruptedException, AbortedException {
+ latch.await();
+ if (aborted) {
+ throw new AbortedException();
+ }
+ }
+
+ public void abort() {
+ aborted = true;
+ latch.countDown();
+ }
+
+ public void countDown() {
+ latch.countDown();
+ }
+
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java
new file mode 100644
index 0000000..1698b28
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java
@@ -0,0 +1,127 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.FutureFallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+import org.rhq.server.metrics.AbortedException;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+
+/**
+ * Generates 6 hour data for a batch of 1 hour data futures. After data is inserted for the batch, aggregation of 6 hour
+ * data will start immediately for the batch if the 24 hour time slice has finished.
+ *
+ * @see Compute6HourData
+ * @author John Sanda
+ */
+class Aggregate1HourData implements Runnable {
+
+ private final Log log = LogFactory.getLog(Aggregate1HourData.class);
+
+ private MetricsDAO dao;
+
+ private AggregationState state;
+
+ private Set<Integer> scheduleIds;
+
+ private List<StorageResultSetFuture> queryFutures;
+
+ public Aggregate1HourData(MetricsDAO dao, AggregationState state, Set<Integer> scheduleIds,
+ List<StorageResultSetFuture> queryFutures) {
+ this.dao = dao;
+ this.state = state;
+ this.scheduleIds = scheduleIds;
+ this.queryFutures = queryFutures;
+ }
+
+ @Override
+ public void run() {
+ final Stopwatch stopwatch = new Stopwatch().start();
+ ListenableFuture<List<ResultSet>> queriesFuture = Futures.successfulAsList(queryFutures);
+ Futures.withFallback(queriesFuture, new FutureFallback<List<ResultSet>>() {
+ @Override
+ public ListenableFuture<List<ResultSet>> create(Throwable t) throws Exception {
+ log.error("An error occurred while fetching one hour data", t);
+ return Futures.immediateFailedFuture(t);
+ }
+ });
+ ListenableFuture<List<ResultSet>> computeFutures = Futures.transform(queriesFuture,
+ state.getCompute6HourData(), state.getAggregationTasks());
+ Futures.addCallback(computeFutures, new FutureCallback<List<ResultSet>>() {
+ @Override
+ public void onSuccess(List<ResultSet> result) {
+ stopwatch.stop();
+ log.debug("Finished aggregating 1 hour data for " + result.size() + " schedules in " +
+ stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
+ start6HourDataAggregationIfNecessary();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (log.isDebugEnabled()) {
+ // TODO should we log the schedule ids?
+ log.debug("Failed to aggregate 1 hour data for " + scheduleIds.size() + " schedules. An " +
+ "unexpected error occurred.", t);
+ } else {
+ log.warn("Failed to aggregate 1 hour data for " + scheduleIds.size() + " schedules. An " +
+ "unexpected error occurred: " + ThrowableUtil.getRootMessage(t));
+ }
+ start6HourDataAggregationIfNecessary();
+ }
+ });
+ }
+
+ private void start6HourDataAggregationIfNecessary() {
+ try {
+ if (state.is24HourTimeSliceFinished()) {
+ update6HourIndexEntries();
+ List<StorageResultSetFuture> queryFutures = new ArrayList<StorageResultSetFuture>(scheduleIds.size());
+ for (Integer scheduleId : scheduleIds) {
+ queryFutures.add(dao.findSixHourMetricsAsync(scheduleId, state.getTwentyFourHourTimeSlice().getMillis(),
+ state.getTwentyFourHourTimeSliceEnd().getMillis()));
+ }
+ state.getAggregationTasks().submit(new Aggregate6HourData(dao, state, scheduleIds, queryFutures));
+ }
+ } catch (InterruptedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("An interrupt occurred while waiting for 6 hour data index entries. Aborting data aggregation",
+ e);
+ } else {
+ log.info("An interrupt occurred while waiting for 6 hour data index entries. Aborting data " +
+ "aggregation: " + e.getMessage());
+ }
+ } finally {
+ state.getRemaining1HourData().addAndGet(-scheduleIds.size());
+ }
+ }
+
+ private void update6HourIndexEntries() throws InterruptedException {
+ try {
+ state.getSixHourIndexEntriesArrival().await();
+ try {
+ state.getSixHourIndexEntriesLock().writeLock().lock();
+ state.getSixHourIndexEntries().removeAll(scheduleIds);
+ } finally {
+ state.getSixHourIndexEntriesLock().writeLock().unlock();
+ }
+ } catch (AbortedException e) {
+ // This means we failed to retrieve the index entries. We can however
+ // continue generating 6 hour data because we do not need the index
+ // here since we already have 6 hour data to aggregate along with the
+ // schedule ids.
+ }
+ }
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java
new file mode 100644
index 0000000..fbd5057
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java
@@ -0,0 +1,84 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.FutureFallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+
+/**
+ * Generates 24 hour data for a batch of 1 hour data futures. After data is inserted for the batch, aggregation of 6
+ * hour data will start immediately for the batch if the 24 hour time slice has finished.
+ *
+ * @see Compute24HourData
+ * @author John Sanda
+ */
+class Aggregate6HourData implements Runnable {
+
+ private final Log log = LogFactory.getLog(Aggregate6HourData.class);
+
+ private MetricsDAO dao;
+
+ private AggregationState state;
+
+ private Set<Integer> scheduleIds;
+
+ private List<StorageResultSetFuture> queryFutures;
+
+ public Aggregate6HourData(MetricsDAO dao, AggregationState state, Set<Integer> scheduleIds,
+ List<StorageResultSetFuture> queryFutures) {
+ this.dao = dao;
+ this.state = state;
+ this.scheduleIds = scheduleIds;
+ this.queryFutures = queryFutures;
+ }
+
+ @Override
+ public void run() {
+ final Stopwatch stopwatch = new Stopwatch().start();
+ ListenableFuture<List<ResultSet>> queriesFuture = Futures.successfulAsList(queryFutures);
+ Futures.withFallback(queriesFuture, new FutureFallback<List<ResultSet>>() {
+ @Override
+ public ListenableFuture<List<ResultSet>> create(Throwable t) throws Exception {
+ log.error("An error occurred while fetching 6 hour data", t);
+ return Futures.immediateFailedFuture(t);
+ }
+ });
+ ListenableFuture<List<ResultSet>> computeFutures = Futures.transform(queriesFuture,
+ state.getCompute24HourData(), state.getAggregationTasks());
+ Futures.addCallback(computeFutures, new FutureCallback<List<ResultSet>>() {
+ @Override
+ public void onSuccess(List<ResultSet> result) {
+ stopwatch.stop();
+ log.debug("Finished aggregating 6 hour data for " + result.size() + " schedules in " +
+ stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
+ state.getRemaining6HourData().addAndGet(-scheduleIds.size());
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (log.isDebugEnabled()) {
+ // TODO should we log the schedule ids?
+ log.debug("Failed to aggregate 6 hour data for " + scheduleIds.size() + " schedules. An " +
+ "unexpected error occurred.", t);
+ } else {
+ log.warn("Failed to aggregate 6 hour data for " + scheduleIds.size() + " schedules. An " +
+ "unexpected error occurred: " + ThrowableUtil.getRootMessage(t));
+ }
+ state.getRemaining6HourData().addAndGet(-scheduleIds.size());
+ }
+ });
+ }
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java
new file mode 100644
index 0000000..cb64fcf
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java
@@ -0,0 +1,73 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+import org.rhq.server.metrics.SignalingCountDownLatch;
+
+/**
+* @author John Sanda
+*/
+class AggregateIndexEntriesHandler implements FutureCallback<ResultSet> {
+
+ private final Log log = LogFactory.getLog(AggregateIndexEntriesHandler.class);
+
+ private Set<Integer> indexEntries;
+
+ private AtomicInteger remainingData;
+
+ private SignalingCountDownLatch indexEntriesArrival;
+
+ private Stopwatch stopwatch;
+
+ private String src;
+
+ private String dest;
+
+ public AggregateIndexEntriesHandler(Set<Integer> indexEntries, AtomicInteger remainingData,
+ SignalingCountDownLatch indexEntriesArrival, Stopwatch stopwatch, String src, String dest) {
+ this.indexEntries = indexEntries;
+ this.remainingData = remainingData;
+ this.indexEntriesArrival = indexEntriesArrival;
+ this.stopwatch = stopwatch;
+ this.src = src;
+ this.dest = dest;
+ }
+
+ @Override
+ public void onSuccess(ResultSet resultSet) {
+ for (Row row : resultSet) {
+ indexEntries.add(row.getInt(1));
+ }
+ remainingData.set(indexEntries.size());
+ indexEntriesArrival.countDown();
+ stopwatch.stop();
+ if (log.isDebugEnabled()) {
+ log.debug("Finished loading " + indexEntries.size() + " " + src + " index entries in " +
+ stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (log.isDebugEnabled()) {
+ log.debug("Some " + dest + " aggregates may not get computed. An unexpected error occurred while " +
+ "retrieving " + src + " index entries", t);
+ } else {
+ log.warn("Some " + dest + " aggregates may not get computed. An unexpected error occurred while " +
+ "retrieving " + src + " index entries: " + ThrowableUtil.getRootMessage(t));
+ }
+ remainingData.set(0);
+ indexEntriesArrival.abort();
+ }
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java
new file mode 100644
index 0000000..87a7266
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java
@@ -0,0 +1,133 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.FutureFallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+import org.rhq.server.metrics.AbortedException;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+
+/**
+ * Generates 1 hour data for a batch of raw data futures. After data is inserted for the batch, aggregation of 1 hour
+ * data will start immediately for the batch if the 6 hour time slice has finished.
+ *
+ * @see Compute1HourData
+ * @author John Sanda
+ */
+class AggregateRawData implements Runnable {
+
+ private final Log log = LogFactory.getLog(AggregateRawData.class);
+
+ private MetricsDAO dao;
+
+ private AggregationState state;
+
+ private Set<Integer> scheduleIds;
+
+ private List<StorageResultSetFuture> queryFutures;
+
+ public AggregateRawData(MetricsDAO dao, AggregationState state, Set<Integer> scheduleIds,
+ List<StorageResultSetFuture> queryFutures) {
+ this.dao = dao;
+ this.state = state;
+ this.scheduleIds = scheduleIds;
+ this.queryFutures = queryFutures;
+ }
+
+ @Override
+ public void run() {
+ final Stopwatch stopwatch = new Stopwatch().start();
+ ListenableFuture<List<ResultSet>> rawDataFutures = Futures.successfulAsList(queryFutures);
+ Futures.withFallback(rawDataFutures, new FutureFallback<List<ResultSet>>() {
+ @Override
+ public ListenableFuture<List<ResultSet>> create(Throwable t) throws Exception {
+ log.error("An error occurred while fetching raw data", t);
+ return Futures.immediateFailedFuture(t);
+ }
+ });
+
+ final ListenableFuture<List<ResultSet>> insert1HourDataFutures = Futures.transform(rawDataFutures,
+ state.getCompute1HourData(), state.getAggregationTasks());
+ Futures.addCallback(insert1HourDataFutures, new FutureCallback<List<ResultSet>>() {
+ @Override
+ public void onSuccess(List<ResultSet> resultSets) {
+ stopwatch.stop();
+ log.debug("Finished aggregating raw data for " + resultSets.size() + " schedules in " +
+ stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
+ start1HourDataAggregationIfNecessary();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (log.isDebugEnabled()) {
+ // TODO should we log the schedule ids?
+ log.debug("Failed to aggregate raw data for " + scheduleIds.size() + " schedules. An unexpected " +
+ "error occurred.", t);
+ } else {
+ log.warn("Failed to aggregate raw data for " + scheduleIds.size() + " schedules. An " +
+ "unexpected error occurred: " + ThrowableUtil.getRootMessage(t));
+ }
+ start1HourDataAggregationIfNecessary();
+ }
+ }, state.getAggregationTasks());
+ }
+
+ private void start1HourDataAggregationIfNecessary() {
+ try {
+ if (state.is6HourTimeSliceFinished()) {
+ update1HourIndexEntries();
+ List<StorageResultSetFuture> oneHourDataQueryFutures = new ArrayList<StorageResultSetFuture>(
+ scheduleIds.size());
+ for (Integer scheduleId : scheduleIds) {
+ oneHourDataQueryFutures.add(dao.findOneHourMetricsAsync(scheduleId,
+ state.getSixHourTimeSlice().getMillis(), state.getSixHourTimeSliceEnd().getMillis()));
+ }
+ state.getAggregationTasks().submit(new Aggregate1HourData(dao, state, scheduleIds,
+ oneHourDataQueryFutures));
+ }
+ } catch (InterruptedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("An interrupt occurred while waiting for 1 hour data index entries. Aborting data aggregation",
+ e);
+ } else {
+ log.info("An interrupt occurred while waiting for 1 hour data index entries. Aborting data " +
+ "aggregation: " + e.getMessage());
+ }
+ } finally {
+ state.getRemainingRawData().addAndGet(-scheduleIds.size());
+ }
+ }
+
+ private void update1HourIndexEntries() throws InterruptedException {
+ try {
+ // Wait for the arrival so that we can remove the schedules ids in this
+ // batch from the one hour index entries. This will prevent duplicate tasks
+ // being submitted to process the same 1 hour data.
+ state.getOneHourIndexEntriesArrival().await();
+ try {
+ state.getOneHourIndexEntriesLock().writeLock().lock();
+ state.getOneHourIndexEntries().removeAll(scheduleIds);
+ } finally {
+ state.getOneHourIndexEntriesLock().writeLock().unlock();
+ }
+ } catch (AbortedException e) {
+ // This means we failed to retrieve the index entries. We can however
+ // continue generating 1 hour data because we do not need the index
+ // here since we already have 1 hour data to aggregate along with the
+ // schedule ids.
+ }
+ }
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java
new file mode 100644
index 0000000..345e53a
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java
@@ -0,0 +1,257 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+import org.joda.time.DateTime;
+
+import org.rhq.server.metrics.SignalingCountDownLatch;
+
+/**
+ * @author John Sanda
+ */
+class AggregationState {
+
+ private ListeningExecutorService aggregationTasks;
+
+ private SignalingCountDownLatch oneHourIndexEntriesArrival;
+
+ private SignalingCountDownLatch sixHourIndexEntriesArrival;
+
+ private AtomicInteger remainingRawData;
+
+ private AtomicInteger remaining1HourData;
+
+ private AtomicInteger remaining6HourData;
+
+ private Set<Integer> oneHourIndexEntries;
+
+ private Set<Integer> sixHourIndexEntries;
+
+ private ReentrantReadWriteLock oneHourIndexEntriesLock;
+
+ private ReentrantReadWriteLock sixHourIndexEntriesLock;
+
+ private DateTime oneHourTimeSlice;
+
+ private DateTime sixHourTimeSlice;
+
+ private DateTime sixHourTimeSliceEnd;
+
+ private DateTime twentyFourHourTimeSlice;
+
+ private DateTime twentyFourHourTimeSliceEnd;
+
+ private boolean sixHourTimeSliceFinished;
+
+ private boolean twentyFourHourTimeSliceFinished;
+
+ private Compute1HourData compute1HourData;
+
+ private Compute6HourData compute6HourData;
+
+ private Compute24HourData compute24HourData;
+
+ public ListeningExecutorService getAggregationTasks() {
+ return aggregationTasks;
+ }
+
+ public AggregationState setAggregationTasks(ListeningExecutorService aggregationTasks) {
+ this.aggregationTasks = aggregationTasks;
+ return this;
+ }
+
+ /**
+ * @return A {@link SignalingCountDownLatch} to signal the arrival of index entries for schedules with 1 hour
+ * data to be aggregated
+ */
+ public SignalingCountDownLatch getOneHourIndexEntriesArrival() {
+ return oneHourIndexEntriesArrival;
+ }
+
+ public AggregationState setOneHourIndexEntriesArrival(SignalingCountDownLatch oneHourIndexEntriesArrival) {
+ this.oneHourIndexEntriesArrival = oneHourIndexEntriesArrival;
+ return this;
+ }
+
+ /**
+ * @return A {@link SignalingCountDownLatch} to signal the arrival of index entries for schedules with 6 hour
+ * data to be aggregated
+ */
+ public SignalingCountDownLatch getSixHourIndexEntriesArrival() {
+ return sixHourIndexEntriesArrival;
+ }
+
+ public AggregationState setSixHourIndexEntriesArrival(SignalingCountDownLatch sixHourIndexEntriesArrival) {
+ this.sixHourIndexEntriesArrival = sixHourIndexEntriesArrival;
+ return this;
+ }
+
+ /**
+ * @return The remaining number of schedules with raw data to be aggregated
+ */
+ public AtomicInteger getRemainingRawData() {
+ return remainingRawData;
+ }
+
+ public AggregationState setRemainingRawData(AtomicInteger remainingRawData) {
+ this.remainingRawData = remainingRawData;
+ return this;
+ }
+
+ /**
+ * @return The remaining number of schedules with 1 hour data to be aggregated
+ */
+ public AtomicInteger getRemaining1HourData() {
+ return remaining1HourData;
+ }
+
+ public AggregationState setRemaining1HourData(AtomicInteger remaining1HourData) {
+ this.remaining1HourData = remaining1HourData;
+ return this;
+ }
+
+ /**
+ * @return The remaining number of schedules with 6 hour data to be aggregated
+ */
+ public AtomicInteger getRemaining6HourData() {
+ return remaining6HourData;
+ }
+
+ public AggregationState setRemaining6HourData(AtomicInteger remaining6HourData) {
+ this.remaining6HourData = remaining6HourData;
+ return this;
+ }
+
+ /**
+ * @return The schedule ids with 1 hour data to be aggregated
+ */
+ public Set<Integer> getOneHourIndexEntries() {
+ return oneHourIndexEntries;
+ }
+
+ public AggregationState setOneHourIndexEntries(Set<Integer> oneHourIndexEntries) {
+ this.oneHourIndexEntries = oneHourIndexEntries;
+ return this;
+ }
+
+ public Set<Integer> getSixHourIndexEntries() {
+ return sixHourIndexEntries;
+ }
+
+ public AggregationState setSixHourIndexEntries(Set<Integer> sixHourIndexEntries) {
+ this.sixHourIndexEntries = sixHourIndexEntries;
+ return this;
+ }
+
+ public ReentrantReadWriteLock getOneHourIndexEntriesLock() {
+ return oneHourIndexEntriesLock;
+ }
+
+ public AggregationState setOneHourIndexEntriesLock(ReentrantReadWriteLock oneHourIndexEntriesLock) {
+ this.oneHourIndexEntriesLock = oneHourIndexEntriesLock;
+ return this;
+ }
+
+ public ReentrantReadWriteLock getSixHourIndexEntriesLock() {
+ return sixHourIndexEntriesLock;
+ }
+
+ public AggregationState setSixHourIndexEntriesLock(ReentrantReadWriteLock sixHourIndexEntriesLock) {
+ this.sixHourIndexEntriesLock = sixHourIndexEntriesLock;
+ return this;
+ }
+
+ public DateTime getOneHourTimeSlice() {
+ return oneHourTimeSlice;
+ }
+
+ public AggregationState setOneHourTimeSlice(DateTime oneHourTimeSlice) {
+ this.oneHourTimeSlice = oneHourTimeSlice;
+ return this;
+ }
+
+ public DateTime getSixHourTimeSlice() {
+ return sixHourTimeSlice;
+ }
+
+ public AggregationState setSixHourTimeSlice(DateTime sixHourTimeSlice) {
+ this.sixHourTimeSlice = sixHourTimeSlice;
+ return this;
+ }
+
+ public DateTime getSixHourTimeSliceEnd() {
+ return sixHourTimeSliceEnd;
+ }
+
+ public AggregationState setSixHourTimeSliceEnd(DateTime sixHourTimeSliceEnd) {
+ this.sixHourTimeSliceEnd = sixHourTimeSliceEnd;
+ return this;
+ }
+
+ public DateTime getTwentyFourHourTimeSlice() {
+ return twentyFourHourTimeSlice;
+ }
+
+ public AggregationState setTwentyFourHourTimeSlice(DateTime twentyFourHourTimeSlice) {
+ this.twentyFourHourTimeSlice = twentyFourHourTimeSlice;
+ return this;
+ }
+
+ public DateTime getTwentyFourHourTimeSliceEnd() {
+ return twentyFourHourTimeSliceEnd;
+ }
+
+ public AggregationState setTwentyFourHourTimeSliceEnd(DateTime twentyFourHourTimeSliceEnd) {
+ this.twentyFourHourTimeSliceEnd = twentyFourHourTimeSliceEnd;
+ return this;
+ }
+
+ public boolean is6HourTimeSliceFinished() {
+ return sixHourTimeSliceFinished;
+ }
+
+ public AggregationState set6HourTimeSliceFinished(boolean is6HourTimeSliceFinished) {
+ this.sixHourTimeSliceFinished = is6HourTimeSliceFinished;
+ return this;
+ }
+
+ public boolean is24HourTimeSliceFinished() {
+ return twentyFourHourTimeSliceFinished;
+ }
+
+ public AggregationState set24HourTimeSliceFinished(boolean is24HourTimeSliceFinished) {
+ this.twentyFourHourTimeSliceFinished = is24HourTimeSliceFinished;
+ return this;
+ }
+
+ public Compute1HourData getCompute1HourData() {
+ return compute1HourData;
+ }
+
+ public AggregationState setCompute1HourData(Compute1HourData compute1HourData) {
+ this.compute1HourData = compute1HourData;
+ return this;
+ }
+
+ public Compute6HourData getCompute6HourData() {
+ return compute6HourData;
+ }
+
+ public AggregationState setCompute6HourData(Compute6HourData compute6HourData) {
+ this.compute6HourData = compute6HourData;
+ return this;
+ }
+
+ public Compute24HourData getCompute24HourData() {
+ return compute24HourData;
+ }
+
+ public AggregationState setCompute24HourData(Compute24HourData compute24HourData) {
+ this.compute24HourData = compute24HourData;
+ return this;
+ }
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
new file mode 100644
index 0000000..bf0bc1a
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
@@ -0,0 +1,378 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeComparator;
+import org.joda.time.Duration;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+import org.rhq.server.metrics.AbortedException;
+import org.rhq.server.metrics.DateTimeService;
+import org.rhq.server.metrics.MetricsConfiguration;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.SignalingCountDownLatch;
+import org.rhq.server.metrics.StorageResultSetFuture;
+import org.rhq.server.metrics.domain.AggregateNumericMetric;
+import org.rhq.server.metrics.domain.MetricsTable;
+
+/**
+ * This class provides the main interface for metric data aggregation.
+ *
+ * @author John Sanda
+ */
+public class Aggregator {
+
+ private static final Comparator<AggregateNumericMetric> AGGREGATE_COMPARATOR = new Comparator<AggregateNumericMetric>() {
+ @Override
+ public int compare(AggregateNumericMetric left, AggregateNumericMetric right) {
+ return (left.getScheduleId() < right.getScheduleId()) ? -1 : ((left.getScheduleId() == right.getScheduleId()) ? 0 : 1);
+ }
+ };
+
+ private final Log log = LogFactory.getLog(Aggregator.class);
+
+ private MetricsDAO dao;
+
+ private MetricsConfiguration configuration;
+
+ private DateTimeService dtService;
+
+ private DateTime startTime;
+
+ /**
+ * Signals when raw data index entries (in metrics_index) can be deleted. We cannot delete the row in metrics_index
+ * until we know that it has been read, successfully or otherwise.
+ */
+ private SignalingCountDownLatch rawDataIndexEntriesArrival;
+
+ private RateLimiter readPermits;
+ private RateLimiter writePermits;
+
+ private int batchSize;
+
+ private AggregationState state;
+
+ private Set<AggregateNumericMetric> oneHourData;
+
+ private AtomicInteger remainingIndexEntries;
+
+ public Aggregator(ListeningExecutorService aggregationTasks, MetricsDAO dao, MetricsConfiguration configuration,
+ DateTimeService dtService, DateTime startTime, int batchSize, RateLimiter writePermits,
+ RateLimiter readPermits) {
+ this.dao = dao;
+ this.configuration = configuration;
+ this.dtService = dtService;
+ this.startTime = startTime;
+ this.readPermits = readPermits;
+ this.writePermits = writePermits;
+ this.batchSize = batchSize;
+ oneHourData = new ConcurrentSkipListSet<AggregateNumericMetric>(AGGREGATE_COMPARATOR);
+ rawDataIndexEntriesArrival = new SignalingCountDownLatch(new CountDownLatch(1));
+ remainingIndexEntries = new AtomicInteger(1);
+
+ DateTime sixHourTimeSlice = get6HourTimeSlice();
+ DateTime twentyFourHourTimeSlice = get24HourTimeSlice();
+
+ state = new AggregationState()
+ .setAggregationTasks(aggregationTasks)
+ .setOneHourTimeSlice(startTime)
+ .setSixHourTimeSlice(sixHourTimeSlice)
+ .setSixHourTimeSliceEnd(sixHourTimeSlice.plus(configuration.getOneHourTimeSliceDuration()))
+ .setTwentyFourHourTimeSlice(twentyFourHourTimeSlice)
+ .setTwentyFourHourTimeSliceEnd(twentyFourHourTimeSlice.plus(configuration.getSixHourTimeSliceDuration()))
+ .setCompute1HourData(new Compute1HourData(startTime, sixHourTimeSlice, writePermits, dao, oneHourData))
+ .setCompute6HourData(new Compute6HourData(sixHourTimeSlice, twentyFourHourTimeSlice, writePermits, dao))
+ .setCompute24HourData(new Compute24HourData(twentyFourHourTimeSlice, writePermits, dao))
+ .set6HourTimeSliceFinished(hasTimeSliceEnded(sixHourTimeSlice, configuration.getOneHourTimeSliceDuration()))
+ .set24HourTimeSliceFinished(hasTimeSliceEnded(twentyFourHourTimeSlice,
+ configuration.getSixHourTimeSliceDuration()))
+ .setRemainingRawData(new AtomicInteger(0))
+ .setRemaining1HourData(new AtomicInteger(0))
+ .setRemaining6HourData(new AtomicInteger(0))
+ .setOneHourIndexEntries(new TreeSet<Integer>())
+ .setSixHourIndexEntries(new TreeSet<Integer>())
+ .setOneHourIndexEntriesLock(new ReentrantReadWriteLock())
+ .setSixHourIndexEntriesLock(new ReentrantReadWriteLock());
+
+ if (state.is6HourTimeSliceFinished()) {
+ state.setOneHourIndexEntriesArrival(new SignalingCountDownLatch(new CountDownLatch(1)));
+ remainingIndexEntries.incrementAndGet();
+ } else {
+ state.setOneHourIndexEntriesArrival(new SignalingCountDownLatch(new CountDownLatch(0)));
+ state.setRemaining1HourData(new AtomicInteger(0));
+ }
+
+ if (state.is24HourTimeSliceFinished()) {
+ state.setSixHourIndexEntriesArrival(new SignalingCountDownLatch(new CountDownLatch(1)));
+ remainingIndexEntries.incrementAndGet();
+ } else {
+ state.setSixHourIndexEntriesArrival(new SignalingCountDownLatch(new CountDownLatch(0)));
+ state.setRemaining6HourData(new AtomicInteger(0));
+ }
+ }
+
+ private DateTime get24HourTimeSlice() {
+ return dtService.getTimeSlice(startTime, configuration.getSixHourTimeSliceDuration());
+ }
+
+ private DateTime get6HourTimeSlice() {
+ return dtService.getTimeSlice(startTime, configuration.getOneHourTimeSliceDuration());
+ }
+
+ private boolean hasTimeSliceEnded(DateTime startTime, Duration duration) {
+ DateTime endTime = startTime.plus(duration);
+ return DateTimeComparator.getInstance().compare(currentHour(), endTime) >= 0;
+ }
+
+ protected DateTime currentHour() {
+ return dtService.getTimeSlice(dtService.now(), configuration.getRawTimeSliceDuration());
+ }
+
+ public Set<AggregateNumericMetric> run() {
+ log.info("Starting aggregation for time slice " + startTime);
+ readPermits.acquire();
+ StorageResultSetFuture rawFuture = dao.findMetricsIndexEntriesAsync(MetricsTable.ONE_HOUR,
+ startTime.getMillis());
+ Futures.addCallback(rawFuture, new FutureCallback<ResultSet>() {
+ @Override
+ public void onSuccess(ResultSet result) {
+ List<Row> rows = result.all();
+ state.getRemainingRawData().set(rows.size());
+ rawDataIndexEntriesArrival.countDown();
+
+ Stopwatch stopwatch = new Stopwatch().start();
+
+ final DateTime endTime = startTime.plus(configuration.getRawTimeSliceDuration());
+ Set<Integer> scheduleIds = new TreeSet<Integer>();
+ List<StorageResultSetFuture> rawDataFutures = new ArrayList<StorageResultSetFuture>(batchSize);
+ for (final Row row : rows) {
+ scheduleIds.add(row.getInt(1));
+ readPermits.acquire();
+ rawDataFutures.add(dao.findRawMetricsAsync(row.getInt(1), startTime.getMillis(),
+ endTime.getMillis()));
+ if (rawDataFutures.size() == batchSize) {
+ state.getAggregationTasks().submit(new AggregateRawData(dao, state, scheduleIds,
+ rawDataFutures));
+ rawDataFutures = new ArrayList<StorageResultSetFuture>();
+ scheduleIds = new TreeSet<Integer>();
+ }
+ }
+ if (!rawDataFutures.isEmpty()) {
+ state.getAggregationTasks().submit(new AggregateRawData(dao, state, scheduleIds,
+ rawDataFutures));
+ }
+
+ if (log.isDebugEnabled()) {
+ stopwatch.stop();
+ log.debug("Finished scheduling raw data aggregation tasks for " + rows.size() + " schedules in " +
+ stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (log.isDebugEnabled()) {
+ log.debug("Aggregation for time slice [" + startTime + "] cannot proceed. There was an " +
+ "unexpected error while retrieving raw data index entries.", t);
+ } else {
+ log.warn("Aggregation for time slice [" + startTime + "] cannot proceed. There was an " +
+ "unexpected error while retrieving raw data index entries: " + ThrowableUtil.getRootMessage(t));
+ }
+ state.setRemainingRawData(new AtomicInteger(0));
+ rawDataIndexEntriesArrival.abort();
+ deleteIndexEntries(MetricsTable.ONE_HOUR);
+ }
+ }, state.getAggregationTasks());
+
+ if (state.is6HourTimeSliceFinished()) {
+ log.debug("Fetching 1 hour index entries");
+ Stopwatch stopwatch = new Stopwatch().start();
+ StorageResultSetFuture oneHourFuture = dao.findMetricsIndexEntriesAsync(MetricsTable.SIX_HOUR,
+ state.getSixHourTimeSlice().getMillis());
+ Futures.addCallback(oneHourFuture, new AggregateIndexEntriesHandler(state.getOneHourIndexEntries(),
+ state.getRemaining1HourData(), state.getOneHourIndexEntriesArrival(), stopwatch, "1 hour", "6 hour"),
+ state.getAggregationTasks());
+ }
+
+ if (state.is24HourTimeSliceFinished()) {
+ log.debug("Fetching 6 hour index entries");
+ Stopwatch stopwatch = new Stopwatch().start();
+ StorageResultSetFuture sixHourFuture = dao.findMetricsIndexEntriesAsync(MetricsTable.TWENTY_FOUR_HOUR,
+ state.getTwentyFourHourTimeSlice().getMillis());
+ Futures.addCallback(sixHourFuture, new AggregateIndexEntriesHandler(state.getSixHourIndexEntries(),
+ state.getRemaining6HourData(), state.getSixHourIndexEntriesArrival(), stopwatch, "6 hour", "24 hour"),
+ state.getAggregationTasks());
+ }
+
+ try {
+ try {
+ rawDataIndexEntriesArrival.await();
+ deleteIndexEntries(MetricsTable.ONE_HOUR);
+ } catch (AbortedException e) {
+ }
+
+ if (state.is6HourTimeSliceFinished()) {
+ waitFor(state.getRemainingRawData());
+ try {
+ state.getOneHourIndexEntriesArrival().await();
+ deleteIndexEntries(MetricsTable.SIX_HOUR);
+
+ List<StorageResultSetFuture> queryFutures = new ArrayList<StorageResultSetFuture>(batchSize);
+ Set<Integer> scheduleIds = new TreeSet<Integer>();
+ state.getOneHourIndexEntriesLock().writeLock().lock();
+ log.debug("Preparing to submit 1 hour data aggregation tasks for " +
+ state.getOneHourIndexEntries().size() + " schedules");
+ for (Integer scheduleId : state.getOneHourIndexEntries()) {
+ queryFutures.add(dao.findOneHourMetricsAsync(scheduleId, state.getSixHourTimeSlice().getMillis(),
+ state.getSixHourTimeSliceEnd().getMillis()));
+ scheduleIds.add(scheduleId);
+ if (queryFutures.size() == batchSize) {
+ state.getAggregationTasks().submit(new Aggregate1HourData(dao, state, scheduleIds,
+ queryFutures));
+ queryFutures = new ArrayList<StorageResultSetFuture>(batchSize);
+ scheduleIds = new TreeSet<Integer>();
+ }
+ }
+ if (!queryFutures.isEmpty()) {
+ state.getAggregationTasks().submit(new Aggregate1HourData(dao, state, scheduleIds,
+ queryFutures));
+ queryFutures = null;
+ scheduleIds = null;
+ }
+ } catch (AbortedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Some 6 hour aggregates may not get generated. There was an unexpected error while " +
+ "loading 1 hour index entries", e);
+ } else {
+ log.warn("Some 6 hour aggregates may not get generated. There was an unexpected error while " +
+ "loading 1 hour index entries: " + ThrowableUtil.getRootMessage(e));
+ }
+ } finally {
+ state.getOneHourIndexEntriesLock().writeLock().unlock();
+ }
+ }
+
+ if (state.is24HourTimeSliceFinished()) {
+ waitFor(state.getRemaining1HourData());
+ try {
+ state.getSixHourIndexEntriesArrival().await();
+ deleteIndexEntries(MetricsTable.TWENTY_FOUR_HOUR);
+
+ List<StorageResultSetFuture> queryFutures = new ArrayList<StorageResultSetFuture>(batchSize);
+ Set<Integer> scheduleIds = new TreeSet<Integer>();
+ state.getSixHourIndexEntriesLock().writeLock().lock();
+ log.debug("Preparing to submit 6 hour data aggregation tasks for " +
+ state.getSixHourIndexEntries().size() + " schedules");
+ for (Integer scheduleId : state.getSixHourIndexEntries()) {
+ queryFutures.add(dao.findSixHourMetricsAsync(scheduleId, state.getTwentyFourHourTimeSlice().getMillis(),
+ state.getTwentyFourHourTimeSliceEnd().getMillis()));
+ scheduleIds.add(scheduleId);
+ if (queryFutures.size() == batchSize) {
+ state.getAggregationTasks().submit(new Aggregate6HourData(dao, state, scheduleIds,
+ queryFutures));
+ queryFutures = new ArrayList<StorageResultSetFuture>(batchSize);
+ scheduleIds = new TreeSet<Integer>();
+ }
+ }
+ if (!queryFutures.isEmpty()) {
+ state.getAggregationTasks().submit(new Aggregate6HourData(dao, state, scheduleIds,
+ queryFutures));
+ queryFutures = null;
+ scheduleIds = null;
+ }
+ } catch (AbortedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Some 24 hour aggregates may not get generated. There was an unexpected error while " +
+ "loading 6 hour index entries", e);
+ } else {
+ log.warn("Some 24 hour aggregates may not get generated. There was an unexpected error while " +
+ "loading 6 hour index entries: " + ThrowableUtil.getRootMessage(e));
+ }
+ } finally {
+ state.getSixHourIndexEntriesLock().writeLock().unlock();
+ }
+ }
+
+ while (!isAggregationFinished()) {
+ Thread.sleep(50);
+ }
+ } catch (InterruptedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("An interrupt occurred while waiting for aggregation to finish. Aborting remaining work.", e);
+ } else {
+ log.warn("An interrupt occurred while waiting for aggregation to finish. Aborting remaining work: " +
+ ThrowableUtil.getRootMessage(e));
+ }
+ log.warn("An interrupt occurred while waiting for aggregation to finish", e);
+ }
+ return oneHourData;
+ }
+
+ private void waitFor(AtomicInteger remainingData) throws InterruptedException {
+ while (remainingData.get() > 0) {
+ Thread.sleep(50);
+ }
+ }
+
+ private boolean isAggregationFinished() {
+ return state.getRemainingRawData().get() <= 0 && state.getRemaining1HourData().get() <= 0 &&
+ state.getRemaining6HourData().get() <= 0 && remainingIndexEntries.get() <= 0;
+ }
+
+ private void deleteIndexEntries(final MetricsTable table) {
+ final DateTime time;
+ switch (table) {
+ case ONE_HOUR:
+ time = startTime;
+ break;
+ case SIX_HOUR:
+ time = state.getSixHourTimeSlice();
+ break;
+ default:
+ time = state.getTwentyFourHourTimeSlice();
+ break;
+ }
+ log.debug("Deleting " + table + " index entries for time slice " + time);
+ writePermits.acquire();
+ StorageResultSetFuture future = dao.deleteMetricsIndexEntriesAsync(table, time.getMillis());
+ Futures.addCallback(future, new FutureCallback<ResultSet>() {
+ @Override
+ public void onSuccess(ResultSet result) {
+ remainingIndexEntries.decrementAndGet();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to delete index entries for table " + table + " at time [" + time + "]. An " +
+ "unexpected error occurred.", t);
+ } else {
+ log.warn("Failed to delete index entries for table " + table + " at time [" + time + "]. An " +
+ "unexpected error occurred: " + ThrowableUtil.getRootMessage(t));
+ }
+ remainingIndexEntries.decrementAndGet();
+ }
+ });
+ }
+
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute1HourData.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute1HourData.java
new file mode 100644
index 0000000..f130f75
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute1HourData.java
@@ -0,0 +1,113 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.joda.time.DateTime;
+
+import org.rhq.server.metrics.ArithmeticMeanCalculator;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+import org.rhq.server.metrics.domain.AggregateNumericMetric;
+import org.rhq.server.metrics.domain.AggregateType;
+import org.rhq.server.metrics.domain.MetricsTable;
+
+/**
+ * Computes 1 hour data for a batch of raw data result sets. The generated 1 hour aggregates are inserted along with
+ * their corresponding index updates.
+ *
+ * @author John Sanda
+ */
+class Compute1HourData implements AsyncFunction<List<ResultSet>, List<ResultSet>> {
+
+ private final Log log = LogFactory.getLog(Compute1HourData.class);
+
+ private DateTime startTime;
+
+ private RateLimiter writePermits;
+
+ private MetricsDAO dao;
+
+ private DateTime sixHourTimeSlice;
+
+ private Set<AggregateNumericMetric> oneHourData;
+
+ public Compute1HourData(DateTime startTime, DateTime sixHourTimeSlice, RateLimiter writePermits, MetricsDAO dao,
+ Set<AggregateNumericMetric> oneHourData) {
+ this.startTime = startTime;
+ this.sixHourTimeSlice = sixHourTimeSlice;
+ this.writePermits = writePermits;
+ this.dao = dao;
+ this.oneHourData = oneHourData;
+ }
+
+ @Override
+ public ListenableFuture<List<ResultSet>> apply(List<ResultSet> rawDataResultSets) throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug("Computing and storing 1 hour data for " + rawDataResultSets.size() + " schedules");
+ }
+ Stopwatch stopwatch = new Stopwatch().start();
+ try {
+ List<StorageResultSetFuture> insertFutures = new ArrayList<StorageResultSetFuture>(rawDataResultSets.size());
+ for (ResultSet resultSet : rawDataResultSets) {
+ AggregateNumericMetric aggregate = calculateAggregatedRaw(resultSet);
+ oneHourData.add(aggregate);
+ writePermits.acquire(4);
+ insertFutures.add(dao.insertOneHourDataAsync(aggregate.getScheduleId(), aggregate.getTimestamp(),
+ AggregateType.MIN, aggregate.getMin()));
+ insertFutures.add(dao.insertOneHourDataAsync(aggregate.getScheduleId(), aggregate.getTimestamp(),
+ AggregateType.MAX, aggregate.getMax()));
+ insertFutures.add(dao.insertOneHourDataAsync(aggregate.getScheduleId(), aggregate.getTimestamp(),
+ AggregateType.AVG, aggregate.getAvg()));
+ insertFutures.add(dao.updateMetricsIndex(MetricsTable.SIX_HOUR, aggregate.getScheduleId(),
+ sixHourTimeSlice.getMillis()));
+ }
+ return Futures.successfulAsList(insertFutures);
+ } finally {
+ if (log.isDebugEnabled()) {
+ stopwatch.stop();
+ log.debug("Finished computing and storing 1 hour data for " + rawDataResultSets.size() +
+ " schedules in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
+ }
+ }
+ }
+
+ private AggregateNumericMetric calculateAggregatedRaw(ResultSet resultSet) {
+ double min = Double.NaN;
+ double max = min;
+ int count = 0;
+ ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
+ double value;
+ List<Row> rows = resultSet.all();
+
+ for (Row row : rows) {
+ value = row.getDouble(2);
+ if (count == 0) {
+ min = value;
+ max = min;
+ }
+ if (value < min) {
+ min = value;
+ } else if (value > max) {
+ max = value;
+ }
+ mean.add(value);
+ ++count;
+ }
+
+ return new AggregateNumericMetric(rows.get(0).getInt(0), mean.getArithmeticMean(), min, max,
+ startTime.getMillis());
+ }
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute24HourData.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute24HourData.java
new file mode 100644
index 0000000..6fe9d79
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute24HourData.java
@@ -0,0 +1,99 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.joda.time.DateTime;
+
+import org.rhq.server.metrics.ArithmeticMeanCalculator;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+import org.rhq.server.metrics.domain.AggregateNumericMetric;
+import org.rhq.server.metrics.domain.AggregateType;
+
+/**
+ * Computes 24 hour data for a batch of raw data result sets. The generated 6 hour aggregates are inserted.
+ *
+ * @author John Sanda
+ */
+class Compute24HourData implements AsyncFunction<List<ResultSet>, List<ResultSet>> {
+
+ private final Log log = LogFactory.getLog(Compute24HourData.class);
+
+ private DateTime startTime;
+
+ private RateLimiter writePermits;
+
+ private MetricsDAO dao;
+
+ public Compute24HourData(DateTime startTime, RateLimiter writePermits, MetricsDAO dao) {
+ this.startTime = startTime;
+ this.writePermits = writePermits;
+ this.dao = dao;
+ }
+
+ @Override
+ public ListenableFuture<List<ResultSet>> apply(List<ResultSet> sixHourDataResultSets) throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug("Computing and storing 24 hour data for " + sixHourDataResultSets.size() + " schedules");
+ }
+ Stopwatch stopwatch = new Stopwatch().start();
+ try {
+ List<StorageResultSetFuture> insertFutures =
+ new ArrayList<StorageResultSetFuture>(sixHourDataResultSets.size());
+ for (ResultSet resultSet : sixHourDataResultSets) {
+ AggregateNumericMetric aggregate = calculateAggregate(resultSet);
+ writePermits.acquire(3);
+ insertFutures.add(dao.insertTwentyFourHourDataAsync(aggregate.getScheduleId(), aggregate.getTimestamp(),
+ AggregateType.MIN, aggregate.getMin()));
+ insertFutures.add(dao.insertTwentyFourHourDataAsync(aggregate.getScheduleId(), aggregate.getTimestamp(),
+ AggregateType.MAX, aggregate.getMax()));
+ insertFutures.add(dao.insertTwentyFourHourDataAsync(aggregate.getScheduleId(), aggregate.getTimestamp(),
+ AggregateType.AVG, aggregate.getAvg()));
+ }
+ return Futures.successfulAsList(insertFutures);
+ } finally {
+ if (log.isDebugEnabled()) {
+ stopwatch.stop();
+ log.debug("Finished computing and storing 24 hour data for " + sixHourDataResultSets.size() +
+ " schedules in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
+ }
+ }
+ }
+
+ private AggregateNumericMetric calculateAggregate(ResultSet resultSet) {
+ double min = Double.NaN;
+ double max = min;
+ ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
+ List<Row> rows = resultSet.all();
+
+ for (int i = 0; i < rows.size(); i += 3) {
+ if (i == 0) {
+ min = rows.get(i + 1).getDouble(3);
+ max = rows.get(i).getDouble(3);
+ } else {
+ if (rows.get(i + 1).getDouble(3) < min) {
+ min = rows.get(i + 1).getDouble(3);
+ }
+ if (rows.get(i).getDouble(3) > max) {
+ max = rows.get(i).getDouble(3);
+ }
+ }
+ mean.add(rows.get(i + 2).getDouble(3));
+ }
+ return new AggregateNumericMetric(rows.get(0).getInt(0), mean.getArithmeticMean(), min, max,
+ startTime.getMillis());
+ }
+
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute6HourData.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute6HourData.java
new file mode 100644
index 0000000..ec1ee26
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute6HourData.java
@@ -0,0 +1,106 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.joda.time.DateTime;
+
+import org.rhq.server.metrics.ArithmeticMeanCalculator;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+import org.rhq.server.metrics.domain.AggregateNumericMetric;
+import org.rhq.server.metrics.domain.AggregateType;
+import org.rhq.server.metrics.domain.MetricsTable;
+
+/**
+ * Computes 6 hour data for a batch of raw data result sets. The generated 6 hour aggregates are inserted along with
+ * their corresponding index updates.
+ *
+ * @author John Sanda
+ */
+class Compute6HourData implements AsyncFunction<List<ResultSet>, List<ResultSet>> {
+
+ private final Log log = LogFactory.getLog(Compute6HourData.class);
+
+ private DateTime startTime;
+
+ private RateLimiter writePermits;
+
+ private MetricsDAO dao;
+
+ private DateTime twentyFourHourTimeSlice;
+
+ public Compute6HourData(DateTime startTime, DateTime twentyFourHourTimeSlice, RateLimiter writePermits,
+ MetricsDAO dao) {
+ this.startTime = startTime;
+ this.twentyFourHourTimeSlice = twentyFourHourTimeSlice;
+ this.writePermits = writePermits;
+ this.dao = dao;
+ }
+
+ @Override
+ public ListenableFuture<List<ResultSet>> apply(List<ResultSet> oneHourDataResultSets) throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug("Computing and storing 6 hour data for " + oneHourDataResultSets.size() + " schedules");
+ }
+ Stopwatch stopwatch = new Stopwatch().start();
+ try {
+ List<StorageResultSetFuture> insertFutures =
+ new ArrayList<StorageResultSetFuture>(oneHourDataResultSets.size());
+ for (ResultSet resultSet : oneHourDataResultSets) {
+ AggregateNumericMetric aggregate = calculateAggregate(resultSet);
+ writePermits.acquire(4);
+ insertFutures.add(dao.insertSixHourDataAsync(aggregate.getScheduleId(), aggregate.getTimestamp(),
+ AggregateType.MIN, aggregate.getMin()));
+ insertFutures.add(dao.insertSixHourDataAsync(aggregate.getScheduleId(), aggregate.getTimestamp(),
+ AggregateType.MAX, aggregate.getMax()));
+ insertFutures.add(dao.insertSixHourDataAsync(aggregate.getScheduleId(), aggregate.getTimestamp(),
+ AggregateType.AVG, aggregate.getAvg()));
+ insertFutures.add(dao.updateMetricsIndex(MetricsTable.TWENTY_FOUR_HOUR, aggregate.getScheduleId(),
+ twentyFourHourTimeSlice.getMillis()));
+ }
+ return Futures.successfulAsList(insertFutures);
+ } finally {
+ if (log.isDebugEnabled()) {
+ stopwatch.stop();
+ log.debug("Finished computing and storing 6 hour data for " + oneHourDataResultSets.size() +
+ " schedules in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
+ }
+ }
+ }
+
+ private AggregateNumericMetric calculateAggregate(ResultSet resultSet) {
+ double min = Double.NaN;
+ double max = min;
+ ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
+ List<Row> rows = resultSet.all();
+
+ for (int i = 0; i < rows.size(); i += 3) {
+ if (i == 0) {
+ min = rows.get(i + 1).getDouble(3);
+ max = rows.get(i).getDouble(3);
+ } else {
+ if (rows.get(i + 1).getDouble(3) < min) {
+ min = rows.get(i + 1).getDouble(3);
+ }
+ if (rows.get(i).getDouble(3) > max) {
+ max = rows.get(i).getDouble(3);
+ }
+ }
+ mean.add(rows.get(i + 2).getDouble(3));
+ }
+ return new AggregateNumericMetric(rows.get(0).getInt(0), mean.getArithmeticMean(), min, max,
+ startTime.getMillis());
+ }
+}
diff --git a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java
new file mode 100644
index 0000000..7d66cef
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java
@@ -0,0 +1,498 @@
+package org.rhq.server.metrics;
+
+import static java.util.Arrays.asList;
+import static org.rhq.test.AssertUtils.assertCollectionEqualsNoOrder;
+import static org.testng.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.joda.time.DateTime;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.rhq.core.domain.measurement.MeasurementDataNumeric;
+import org.rhq.server.metrics.aggregation.Aggregator;
+import org.rhq.server.metrics.domain.AggregateNumericMetric;
+import org.rhq.server.metrics.domain.AggregateType;
+import org.rhq.server.metrics.domain.MetricsIndexEntry;
+import org.rhq.server.metrics.domain.MetricsTable;
+
+/**
+ * @author John Sanda
+ */
+public class AggregationTests extends MetricsTest {
+
+ private Aggregates schedule1 = new Aggregates();
+ private Aggregates schedule2 = new Aggregates();
+ private Aggregates schedule3 = new Aggregates();
+ private Aggregates schedule4 = new Aggregates();
+ private Aggregates schedule5 = new Aggregates();
+
+ private ListeningExecutorService aggregationTasks;
+
+ private DateTime currentHour;
+
+ private RateLimiter writePermits;
+ private RateLimiter readPermits;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ purgeDB();
+
+ schedule1.id = 100;
+ schedule2.id = 101;
+ schedule3.id = 102;
+ schedule4.id = 104;
+ schedule5.id = 105;
+
+ aggregationTasks = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
+ writePermits = RateLimiter.create(500);
+ readPermits = RateLimiter.create(350);
+ }
+
+ @Test
+ public void insertRawDataDuringHour16() throws Exception {
+ insertRawData(
+ new MeasurementDataNumeric(hour(16).plusMinutes(20).getMillis(), schedule1.id, 3.0),
+ new MeasurementDataNumeric(hour(16).plusMinutes(40).getMillis(), schedule1.id, 5.0),
+ new MeasurementDataNumeric(hour(16).plusMinutes(15).getMillis(), schedule2.id, 0.0032),
+ new MeasurementDataNumeric(hour(16).plusMinutes(30).getMillis(), schedule2.id, 0.104),
+ new MeasurementDataNumeric(hour(16).plusMinutes(7).getMillis(), schedule3.id, 3.14)
+ ).await("Failed to insert raw data");
+
+ updateIndex(
+ new IndexUpdate(MetricsTable.ONE_HOUR, schedule1.id, hour(16)),
+ new IndexUpdate(MetricsTable.ONE_HOUR, schedule2.id, hour(16)),
+ new IndexUpdate(MetricsTable.ONE_HOUR, schedule3.id, hour(16))
+ ).await("Failed to update raw data index");
+ }
+
+ @Test(dependsOnMethods = "insertRawDataDuringHour16")
+ public void runAggregationForHour16() throws Exception {
+ currentHour = hour(17);
+ AggregatorTestStub aggregator = new AggregatorTestStub(hour(16));
+
+ Set<AggregateNumericMetric> oneHourData = aggregator.run();
+
+ schedule1.oneHourData.put(hour(16), new AggregateNumericMetric(schedule1.id, avg(3.0, 5.0), 3.0, 5.0,
+ hour(16).getMillis()));
+ schedule2.oneHourData.put(hour(16), new AggregateNumericMetric(schedule2.id, avg(0.0032, 0.104), 0.0032, 0.104,
+ hour(16).getMillis()));
+ schedule3.oneHourData.put(hour(16), new AggregateNumericMetric(schedule3.id, 3.14, 3.14, 3.14,
+ hour(16).getMillis()));
+
+ List<AggregateNumericMetric> expected = asList(schedule1.oneHourData.get(hour(16)),
+ schedule2.oneHourData.get(hour(16)), schedule3.oneHourData.get(hour(16)));
+ assertCollectionEqualsNoOrder(expected, oneHourData, "The returned one hour aggregates are wrong");
+ // verify values in the db
+ assert1HourDataEquals(schedule1.id, schedule1.oneHourData.get(hour(16)));
+ assert1HourDataEquals(schedule2.id, schedule2.oneHourData.get(hour(16)));
+ assert1HourDataEquals(schedule3.id, schedule3.oneHourData.get(hour(16)));
+ assert6HourIndexEquals(hour(12), schedule1.id, schedule2.id, schedule3.id);
+ assert6HourDataEmpty(schedule1.id);
+ assert6HourDataEmpty(schedule2.id);
+ assert6HourDataEmpty(schedule3.id);
+ assert24HourMetricsIndexEmpty(hour(0));
+ assert24HourMetricsIndexEmpty(hour(0));
+ assert1HourMetricsIndexEmpty(hour(16));
+ }
+
+ @Test(dependsOnMethods = "runAggregationForHour16")
+ public void insertRawDataDuringHour17() throws Exception {
+ insertRawData(
+ new MeasurementDataNumeric(hour(17).plusMinutes(20).getMillis(), schedule1.id, 11.0),
+ new MeasurementDataNumeric(hour(17).plusMinutes(40).getMillis(), schedule1.id, 16.0),
+ new MeasurementDataNumeric(hour(17).plusMinutes(30).getMillis(), schedule2.id, 0.092),
+ new MeasurementDataNumeric(hour(17).plusMinutes(45).getMillis(), schedule2.id, 0.0733)
+ ).await("Failed to insert raw data");
+
+ updateIndex(
+ new IndexUpdate(MetricsTable.ONE_HOUR, schedule1.id, hour(17)),
+ new IndexUpdate(MetricsTable.ONE_HOUR, schedule2.id, hour(17))
+ ).await("Failed to update raw data index");
+ }
+
+ @Test(dependsOnMethods = "insertRawDataDuringHour17")
+ public void runAggregationForHour17() throws Exception {
+ currentHour = hour(18);
+ AggregatorTestStub aggregator = new AggregatorTestStub(hour(17));
+
+ Set<AggregateNumericMetric> oneHourData = aggregator.run();
+
+ schedule1.oneHourData.put(hour(17), new AggregateNumericMetric(schedule1.id, avg(11.0, 16.0), 11.0, 16.0,
+ hour(17).getMillis()));
+ schedule2.oneHourData.put(hour(17), new AggregateNumericMetric(schedule2.id, avg(0.092, 0.0733), 0.0733, 0.092,
+ hour(17).getMillis()));
+
+ schedule1.sixHourData.put(hour(12), new AggregateNumericMetric(schedule1.id,
+ avg(schedule1.oneHourData, hour(16), hour(17)), min(schedule1.oneHourData, hour(16), hour(17)),
+ max(schedule1.oneHourData, hour(16), hour(17)), hour(12).getMillis()));
+ schedule2.sixHourData.put(hour(12), new AggregateNumericMetric(schedule2.id,
+ avg(schedule2.oneHourData, hour(16), hour(17)), min(schedule2.oneHourData, hour(16), hour(17)),
+ max(schedule2.oneHourData, hour(16), hour(17)), hour(12).getMillis()));
+ schedule3.sixHourData.put(hour(12), new AggregateNumericMetric(schedule3.id, 3.14, 3.14, 3.14,
+ hour(12).getMillis()));
+
+ List<AggregateNumericMetric> expected = asList(schedule1.oneHourData.get(hour(17)),
+ schedule2.oneHourData.get(hour(17)));
+ assertCollectionEqualsNoOrder(expected, oneHourData, "The returned one hour data is wrong");
+ // verify values in the db
+ assert1HourDataEquals(schedule1.id, schedule1.oneHourData.get(hour(16)), schedule1.oneHourData.get(hour(17)));
+ assert1HourDataEquals(schedule2.id, schedule2.oneHourData.get(hour(16)), schedule2.oneHourData.get(hour(17)));
+ assert1HourDataEquals(schedule3.id, schedule3.oneHourData.get(hour(16)));
+ assert6HourDataEquals(schedule1.id, schedule1.sixHourData.get(hour(12)));
+ assert6HourDataEquals(schedule2.id, schedule2.sixHourData.get(hour(12)));
+ assert6HourDataEquals(schedule3.id, schedule3.sixHourData.get(hour(12)));
+ assert24HourDataEmpty(schedule1.id);
+ assert24HourDataEmpty(schedule2.id);
+ assert24HourDataEmpty(schedule3.id);
+ assert1HourMetricsIndexEmpty(hour(17));
+ assert6HourMetricsIndexEmpty(hour(12));
+ assert24HourIndexEquals(hour(0), schedule1.id, schedule2.id, schedule3.id);
+ }
+
+ @Test(dependsOnMethods = "runAggregationForHour17")
+ public void insertRawDataDuringHour18() throws Exception {
+ insertRawData(
+ new MeasurementDataNumeric(hour(18).plusMinutes(20).getMillis(), schedule1.id, 22.0),
+ new MeasurementDataNumeric(hour(18).plusMinutes(40).getMillis(), schedule1.id, 26.0),
+ new MeasurementDataNumeric(hour(18).plusMinutes(15).getMillis(), schedule2.id, 0.205),
+ new MeasurementDataNumeric(hour(18).plusMinutes(15).getMillis(), schedule3.id, 2.42)
+ ).await("Failed to insert raw data");
+
+ updateIndex(
+ new IndexUpdate(MetricsTable.ONE_HOUR, schedule1.id, hour(18)),
+ new IndexUpdate(MetricsTable.ONE_HOUR, schedule2.id, hour(18)),
+ new IndexUpdate(MetricsTable.ONE_HOUR, schedule3.id, hour(18))
+ ).await("Failed to update raw data index");
+ }
+
+ @Test(dependsOnMethods = "insertRawDataDuringHour18")
+ public void runAggregationForHour18() throws Exception {
+ currentHour = hour(19);
+ AggregatorTestStub aggregator = new AggregatorTestStub(hour(18));
+
+ Set<AggregateNumericMetric> oneHourData = aggregator.run();
+
+ schedule1.oneHourData.put(hour(18), new AggregateNumericMetric(schedule1.id, avg(22.0, 26.0), 22.0, 26.0,
+ hour(18).getMillis()));
+ schedule2.oneHourData.put(hour(18), new AggregateNumericMetric(schedule2.id, 0.205, 0.205, 0.205,
+ hour(18).getMillis()));
+ schedule3.oneHourData.put(hour(18), new AggregateNumericMetric(schedule3.id, 2.42, 2.42, 2.42,
+ hour(18).getMillis()));
+
+ List<AggregateNumericMetric> expected = asList(schedule1.oneHourData.get(hour(18)),
+ schedule2.oneHourData.get(hour(18)), schedule3.oneHourData.get(hour(18)));
+ assertCollectionEqualsNoOrder(expected, oneHourData, "The returned one hour data is wrong");
+ // verify values in db
+ assert1HourDataEquals(schedule1.id, schedule1.oneHourData.get(hour(18)), schedule1.oneHourData.get(hour(17)),
+ schedule1.oneHourData.get(hour(16)));
+ assert1HourDataEquals(schedule2.id, schedule2.oneHourData.get(hour(18)), schedule2.oneHourData.get(hour(17)),
+ schedule2.oneHourData.get(hour(16)));
+ assert1HourDataEquals(schedule3.id, schedule3.oneHourData.get(hour(18)), schedule3.oneHourData.get(hour(16)));
+ assert6HourDataEquals(schedule1.id, schedule1.sixHourData.get(hour(12)));
+ assert6HourDataEquals(schedule2.id, schedule2.sixHourData.get(hour(12)));
+ assert6HourDataEquals(schedule3.id, schedule3.sixHourData.get(hour(12)));
+ assert6HourIndexEquals(hour(18), schedule1.id, schedule2.id, schedule3.id);
+ assert24HourDataEmpty(schedule1.id);
+ assert24HourDataEmpty(schedule2.id);
+ assert24HourDataEmpty(schedule3.id);
+ assert24HourIndexEquals(hour(0), schedule1.id, schedule2.id, schedule3.id);
+ assert1HourMetricsIndexEmpty(hour(18));
+ }
+
+ @Test(dependsOnMethods = "runAggregationForHour18")
+ public void insertRawDataDuringHour23() throws Exception {
+ insertRawData(
+ new MeasurementDataNumeric(hour(23).plusMinutes(25).getMillis(), schedule1.id, 34.0),
+ new MeasurementDataNumeric(hour(23).plusMinutes(30).getMillis(), schedule2.id, 0.322)
+ ).await("Failed to insert raw data");
+
+ updateIndex(
+ new IndexUpdate(MetricsTable.ONE_HOUR, schedule1.id, hour(23)),
+ new IndexUpdate(MetricsTable.ONE_HOUR, schedule2.id, hour(23))
+ ).await("Failed to update raw data index");
+ }
+
+ @Test(dependsOnMethods = "insertRawDataDuringHour23")
+ public void runAggregationForHour24() throws Exception {
+ currentHour = hour(24);
+ AggregatorTestStub aggregator = new AggregatorTestStub(hour(23));
+
+ Set<AggregateNumericMetric> oneHourData = aggregator.run();
+
+ schedule1.oneHourData.put(hour(23), new AggregateNumericMetric(schedule1.id, 34.0, 34.0, 34.0,
+ hour(23).getMillis()));
+ schedule1.sixHourData.put(hour(18), new AggregateNumericMetric(schedule1.id,
+ avg(schedule1.oneHourData, hour(18), hour(23)),
+ min(schedule1.oneHourData, hour(18), hour(23)),
+ max(schedule1.oneHourData, hour(18), hour(23)),
+ hour(18).getMillis()));
+ schedule1.twentyFourHourData.put(hour(0),
+ new AggregateNumericMetric(schedule1.id,
+ avg(schedule1.sixHourData, hour(12), hour(18)),
+ min(schedule1.sixHourData, hour(12), hour(18)),
+ max(schedule1.sixHourData, hour(12), hour(18)),
+ hour(0).getMillis()));
+ schedule2.oneHourData.put(hour(23), new AggregateNumericMetric(schedule2.id, 0.322, 0.322, 0.322,
+ hour(23).getMillis()));
+ schedule2.sixHourData.put(hour(18), new AggregateNumericMetric(schedule2.id,
+ avg(schedule2.oneHourData, hour(18), hour(23)),
+ min(schedule2.oneHourData, hour(18), hour(23)),
+ max(schedule2.oneHourData, hour(18), hour(23)),
+ hour(18).getMillis()));
+ schedule2.twentyFourHourData.put(hour(0), new AggregateNumericMetric(schedule2.id,
+ avg(schedule2.sixHourData, hour(12), hour(18)),
+ min(schedule2.sixHourData, hour(12), hour(18)),
+ max(schedule2.sixHourData, hour(12), hour(18)),
+ hour(0).getMillis()));
+ schedule3.sixHourData.put(hour(18), new AggregateNumericMetric(schedule3.id, 2.42, 2.42, 2.42,
+ hour(18).getMillis()));
+ schedule3.twentyFourHourData.put(hour(0), new AggregateNumericMetric(schedule3.id,
+ avg(schedule3.sixHourData, hour(12), hour(18)),
+ min(schedule3.sixHourData, hour(12), hour(18)),
+ max(schedule3.sixHourData, hour(12), hour(18)),
+ hour(0).getMillis()));
+
+ List<AggregateNumericMetric> expected = asList(schedule1.oneHourData.get(hour(23)),
+ schedule2.oneHourData.get(hour(23)));
+
+ assertCollectionEqualsNoOrder(expected, oneHourData, "The returned one hour data is wrong");
+ // verify values in db
+ assert1HourDataEquals(schedule1.id, schedule1.oneHourData.get(hour(23)), schedule1.oneHourData.get(hour(18)),
+ schedule1.oneHourData.get(hour(17)), schedule1.oneHourData.get(hour(16)));
+ assert1HourDataEquals(schedule2.id, schedule2.oneHourData.get(hour(23)), schedule2.oneHourData.get(hour(18)),
+ schedule2.oneHourData.get(hour(17)), schedule2.oneHourData.get(hour(16)));
+ assert1HourDataEquals(schedule3.id, schedule3.oneHourData.get(hour(18)), schedule3.oneHourData.get(hour(16)));
+ assert6HourDataEquals(schedule1.id, schedule1.sixHourData.get(hour(12)), schedule1.sixHourData.get(hour(18)));
+ assert6HourDataEquals(schedule2.id, schedule2.sixHourData.get(hour(12)), schedule2.sixHourData.get(hour(18)));
+ assert6HourDataEquals(schedule3.id, schedule3.sixHourData.get(hour(12)), schedule3.sixHourData.get(hour(18)));
+ assert24HourDataEquals(schedule1.id, schedule1.twentyFourHourData.get(hour(0)));
+ assert24HourDataEquals(schedule2.id, schedule2.twentyFourHourData.get(hour(0)));
+ assert24HourDataEquals(schedule3.id, schedule3.twentyFourHourData.get(hour(0)));
+ assert1HourMetricsIndexEmpty(hour(23));
+ assert6HourMetricsIndexEmpty(hour(18));
+ assert24HourMetricsIndexEmpty(hour(0));
+ }
+
+ @Test(dependsOnMethods = "runAggregationForHour24")
+ public void resetDBForFailureScenarios() throws Exception {
+ purgeDB();
+ }
+
+ @Test(dependsOnMethods = "resetDBForFailureScenarios")
+ public void failToFetchRawDataIndexDuringAggregationForHour12() throws Exception {
+ currentHour = hour(12);
+ AggregatorTestStub aggregator = new AggregatorTestStub(hour(11), new MetricsDAO(storageSession, configuration) {
+ @Override
+ public StorageResultSetFuture findMetricsIndexEntriesAsync(MetricsTable table, long timestamp) {
+ if (table == MetricsTable.ONE_HOUR) {
+ return new FailedStorageResultSetFuture(new Exception("Failed to fetch raw data index"));
+ } else {
+ return super.findMetricsIndexEntriesAsync(table,
+ timestamp);
+ }
+ }
+ });
+
+ insertRawData(
+ new MeasurementDataNumeric(hour(12).plusMinutes(10).getMillis(), schedule4.id, 7.456),
+ new MeasurementDataNumeric(hour(12).plusMinutes(14).getMillis(), schedule5.id, 29.3)
+ ).await("Failed to insert raw data");
+
+ updateIndex(
+ new IndexUpdate(MetricsTable.ONE_HOUR, schedule4.id, hour(12)),
+ new IndexUpdate(MetricsTable.ONE_HOUR, schedule5.id, hour(12))
+ ).await("Failed to update raw data index");
+
+ insert1HourData(
+ new AggregateNumericMetric(schedule4.id, 26.6, 18.33, 29.02, hour(10).getMillis()),
+ new AggregateNumericMetric(schedule4.id, 25.2, 21.12, 28.05, hour(11).getMillis())
+ ).await("Failed to insert 1 hour data");
+
+ updateIndex(new IndexUpdate(MetricsTable.SIX_HOUR, schedule4.id, hour(6)))
+ .await("Failed to update 1 hr data index");
+
+ schedule4.oneHourData.put(hour(10), new AggregateNumericMetric(schedule4.id, 26.6, 18.33, 29.02,
+ hour(10).getMillis()));
+ schedule4.oneHourData.put(hour(11), new AggregateNumericMetric(schedule4.id, 25.2, 21.12, 28.05,
+ hour(11).getMillis()));
+ schedule4.sixHourData.put(hour(6), new AggregateNumericMetric(schedule4.id,
+ avg(schedule4.oneHourData, hour(10), hour(11)),
+ min(schedule4.oneHourData, hour(10), hour(11)),
+ max(schedule4.oneHourData, hour(10), hour(11)),
+ hour(6).getMillis()));
+
+ Set<AggregateNumericMetric> oneHourData = aggregator.run();
+ List<AggregateNumericMetric> emptyAggregates = Collections.emptyList();
+
+ assertTrue(oneHourData.isEmpty(), "Did not expect to get back any one hour aggregates");
+ // verify values in db
+ assert1HourDataEquals(schedule4.id, schedule4.oneHourData.get(hour(10)), schedule4.oneHourData.get(hour(11)));
+ assert1HourDataEquals(schedule5.id, emptyAggregates);
+ assert6HourDataEquals(schedule4.id, schedule4.sixHourData.get(hour(6)));
+ assert6HourDataEmpty(schedule5.id);
+ assert24HourDataEmpty(schedule4.id);
+ assert24HourDataEmpty(schedule5.id);
+ assert1HourMetricsIndexEmpty(hour(11));
+ assert6HourMetricsIndexEmpty(hour(6));
+ assert24HourIndexEquals(hour(0), schedule4.id);
+ }
+
+ private WaitForWrite insertRawData(MeasurementDataNumeric... data) {
+ WaitForWrite waitForRawInserts = new WaitForWrite(data.length);
+ for (MeasurementDataNumeric raw : data) {
+ StorageResultSetFuture resultSetFuture = dao.insertRawData(raw);
+ Futures.addCallback(resultSetFuture, waitForRawInserts);
+ }
+ return waitForRawInserts;
+ }
+
+ private WaitForWrite insert1HourData(AggregateNumericMetric... data) {
+ WaitForWrite waitForWrite = new WaitForWrite(data.length * 3);
+ for (AggregateNumericMetric datum : data) {
+ StorageResultSetFuture future = dao.insertOneHourDataAsync(datum.getScheduleId(), datum.getTimestamp(),
+ AggregateType.AVG, datum.getAvg());
+ Futures.addCallback(future, waitForWrite);
+
+ future = dao.insertOneHourDataAsync(datum.getScheduleId(), datum.getTimestamp(), AggregateType.MIN,
+ datum.getMin());
+ Futures.addCallback(future, waitForWrite);
+
+ future = dao.insertOneHourDataAsync(datum.getScheduleId(), datum.getTimestamp(), AggregateType.MAX,
+ datum.getMax());
+ Futures.addCallback(future, waitForWrite);
+ }
+ return waitForWrite;
+ }
+
+ private WaitForWrite updateIndex(IndexUpdate... updates) {
+ WaitForWrite waitForWrite = new WaitForWrite(updates.length);
+ for (IndexUpdate update : updates) {
+ StorageResultSetFuture future = dao.updateMetricsIndex(update.table, update.scheduleId,
+ update.time.getMillis());
+ Futures.addCallback(future, waitForWrite);
+ }
+ return waitForWrite;
+ }
+
+ private double avg(Map<DateTime, AggregateNumericMetric> data, DateTime... times) {
+ double[] values = new double[times.length];
+ for (int i = 0; i < times.length; ++i) {
+ values[i] = data.get(times[i]).getAvg();
+ }
+ return avg(values);
+ }
+
+ private double min(Map<DateTime, AggregateNumericMetric> data, DateTime... times) {
+ double min = data.get(times[0]).getMin();
+ for (DateTime time : times) {
+ if (data.get(time).getMin() < min) {
+ min = data.get(time).getMin();
+ }
+ }
+ return min;
+ }
+
+ private double max(Map<DateTime, AggregateNumericMetric> data, DateTime... times) {
+ double max = data.get(times[0]).getMin();
+ for (DateTime time : times) {
+ if (data.get(time).getMax() > max) {
+ max = data.get(time).getMax();
+ }
+ }
+ return max;
+ }
+
+ protected void assert6HourIndexEquals(DateTime timeSlice, int... scheduleIds) {
+ List<MetricsIndexEntry> indexEntries = new ArrayList<MetricsIndexEntry>(scheduleIds.length);
+ for (int scheduleId : scheduleIds) {
+ indexEntries.add(new MetricsIndexEntry(MetricsTable.SIX_HOUR, timeSlice, scheduleId));
+ }
+ assertMetricsIndexEquals(MetricsTable.SIX_HOUR, timeSlice.getMillis(), indexEntries,
+ "The 6 hour index is wrong");
+ }
+
+ protected void assert24HourIndexEquals(DateTime timeSlice, int... scheduleIds) {
+ List<MetricsIndexEntry> indexEntries = new ArrayList<MetricsIndexEntry>(scheduleIds.length);
+ for (int scheduleId : scheduleIds) {
+ indexEntries.add(new MetricsIndexEntry(MetricsTable.TWENTY_FOUR_HOUR, timeSlice, scheduleId));
+ }
+ assertMetricsIndexEquals(MetricsTable.TWENTY_FOUR_HOUR, timeSlice.getMillis(), indexEntries,
+ "The 24 hour index is wrong");
+ }
+
+ private class AggregatorTestStub extends Aggregator {
+
+ public AggregatorTestStub(DateTime startTime) {
+ super(aggregationTasks, dao, configuration, dateTimeService, startTime, 250, writePermits, readPermits);
+ }
+
+ public AggregatorTestStub(DateTime startTime, MetricsDAO dao) {
+ super(aggregationTasks, dao, configuration, dateTimeService, startTime, 250, writePermits, readPermits);
+ }
+
+ @Override
+ protected DateTime currentHour() {
+ return currentHour;
+ }
+ }
+
+ private class IndexUpdate {
+ MetricsTable table;
+ int scheduleId;
+ DateTime time;
+
+ public IndexUpdate(MetricsTable table, int scheduleId, DateTime time) {
+ this.table = table;
+ this.scheduleId = scheduleId;
+ this.time = time;
+ }
+ }
+
+ private class Aggregates {
+ int id; // schedule id
+ Map<DateTime, AggregateNumericMetric> oneHourData = new HashMap<DateTime, AggregateNumericMetric>();
+ Map<DateTime, AggregateNumericMetric> sixHourData = new HashMap<DateTime, AggregateNumericMetric>();
+ Map<DateTime, AggregateNumericMetric> twentyFourHourData = new HashMap<DateTime, AggregateNumericMetric>();
+ }
+
+ private class FailedStorageResultSetFuture extends StorageResultSetFuture implements ListenableFuture<ResultSet> {
+
+ private SettableFuture future;
+
+ private Throwable t;
+
+ public FailedStorageResultSetFuture(Throwable t) {
+ super(null, null);
+ future = SettableFuture.create();
+ this.t = t;
+ assertTrue(future.setException(t), "Failed to set exception for future");
+ }
+
+ @Override
+ public void addListener(Runnable listener, Executor executor) {
+ future.addListener(listener, executor);
+ }
+
+ @Override
+ public ResultSet get() {
+ throw new AssertionError();
+ }
+ }
+}
diff --git a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/CassandraIntegrationTest.java b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/CassandraIntegrationTest.java
index f7125e5..4d48cbb 100644
--- a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/CassandraIntegrationTest.java
+++ b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/CassandraIntegrationTest.java
@@ -28,6 +28,8 @@ import java.util.concurrent.CountDownLatch;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
@@ -79,6 +81,12 @@ public class CassandraIntegrationTest {
.withCredentialsObfuscated(RHQADMIN, RHQADMIN_PASSWORD)
.build();
+ PoolingOptions poolingOptions = cluster.getConfiguration().getPoolingOptions();
+ poolingOptions.setCoreConnectionsPerHost(HostDistance.LOCAL, 24);
+ poolingOptions.setCoreConnectionsPerHost(HostDistance.REMOTE, 24);
+ poolingOptions.setMaxConnectionsPerHost(HostDistance.LOCAL, 32);
+ poolingOptions.setMaxConnectionsPerHost(HostDistance.REMOTE, 32);
+
cluster.register(new Host.StateListener() {
@Override
public void onAdd(Host host) {
@@ -145,39 +153,6 @@ public class CassandraIntegrationTest {
}
}
- protected static class WaitForWrite implements FutureCallback<ResultSet> {
-
- private final Log log = LogFactory.getLog(WaitForWrite.class);
-
- private CountDownLatch latch;
-
- private Throwable throwable;
-
- public WaitForWrite(int numResults) {
- latch = new CountDownLatch(numResults);
- }
-
- @Override
- public void onSuccess(ResultSet rows) {
- latch.countDown();
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- latch.countDown();
- this.throwable = throwable;
- log.error("An async operation failed", throwable);
- }
-
- public void await(String errorMsg) throws InterruptedException {
- latch.await();
- if (throwable != null) {
- fail(errorMsg, Throwables.getRootCause(throwable));
- }
- }
-
- }
-
protected static class WaitForRead<T> implements FutureCallback<ResultSet> {
private final Log log = LogFactory.getLog(WaitForRead.class);
diff --git a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsDAOTest.java b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsDAOTest.java
index b06dfda..fff0485 100644
--- a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsDAOTest.java
+++ b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsDAOTest.java
@@ -29,6 +29,7 @@ import static java.util.Arrays.asList;
import static org.rhq.test.AssertUtils.assertCollectionMatchesNoOrder;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
import java.util.ArrayList;
import java.util.HashMap;
@@ -38,7 +39,9 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
+import com.datastax.driver.core.ResultSet;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import org.apache.commons.logging.Log;
@@ -54,6 +57,7 @@ import org.rhq.server.metrics.domain.AggregateNumericMetric;
import org.rhq.server.metrics.domain.AggregateSimpleNumericMetric;
import org.rhq.server.metrics.domain.AggregateType;
import org.rhq.server.metrics.domain.MetricsIndexEntry;
+import org.rhq.server.metrics.domain.MetricsIndexEntryMapper;
import org.rhq.server.metrics.domain.MetricsTable;
import org.rhq.server.metrics.domain.RawNumericMetric;
import org.rhq.server.metrics.domain.RawNumericMetricMapper;
@@ -352,12 +356,24 @@ public class MetricsDAOTest extends CassandraIntegrationTest {
updates.put(scheduleId2, hour0.getMillis());
dao.updateMetricsIndex(MetricsTable.ONE_HOUR, updates);
- List<MetricsIndexEntry> actual = Lists.newArrayList(dao.findMetricsIndexEntries(MetricsTable.ONE_HOUR,
- hour0.getMillis()));
-
- List<MetricsIndexEntry> expected = asList(new MetricsIndexEntry(MetricsTable.ONE_HOUR, hour0, scheduleId1),
+ final List<MetricsIndexEntry> expected = asList(new MetricsIndexEntry(MetricsTable.ONE_HOUR, hour0, scheduleId1),
new MetricsIndexEntry(MetricsTable.ONE_HOUR, hour0, scheduleId2));
- assertCollectionMatchesNoOrder(expected, actual, "Failed to update or retrieve metrics index entries");
+
+ StorageResultSetFuture future = dao.findMetricsIndexEntriesAsync(MetricsTable.ONE_HOUR, hour0.getMillis());
+ Futures.addCallback(future, new FutureCallback<ResultSet>() {
+ @Override
+ public void onSuccess(ResultSet result) {
+ MetricsIndexEntryMapper mapper = new MetricsIndexEntryMapper(MetricsTable.ONE_HOUR);
+ List<MetricsIndexEntry> actual = mapper.mapAll(result);
+
+ assertCollectionMatchesNoOrder(expected, actual, "Failed to update or retrieve metrics index entries");
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ fail("Failed to retrieve one hour index entries", t);
+ }
+ });
}
@Test(enabled = ENABLED)
diff --git a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsPerfTests.java b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsPerfTests.java
new file mode 100644
index 0000000..6e93fec
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsPerfTests.java
@@ -0,0 +1,191 @@
+package org.rhq.server.metrics;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.joda.time.DateTime;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.rhq.core.domain.measurement.MeasurementDataNumeric;
+import org.rhq.server.metrics.domain.AggregateNumericMetric;
+import org.rhq.server.metrics.domain.RawNumericMetric;
+import org.rhq.server.metrics.domain.RawNumericMetricMapper;
+
+/**
+ * @author John Sanda
+ */
+public class MetricsPerfTests extends MetricsTest {
+
+ private class MetricsServerStub extends MetricsServer {
+
+ DateTime currentHour;
+
+ @Override
+ public DateTime currentHour() {
+ return currentHour;
+ }
+
+ public void setCurrentHour(DateTime currentHour) {
+ this.currentHour = currentHour;
+ }
+ }
+
+ private class DateTimeServiceStub extends DateTimeService {
+
+ DateTime currentHour;
+
+ long startTime;
+
+ public DateTimeServiceStub(DateTime currentHour, long startTime) {
+ this.currentHour = currentHour;
+ this.startTime = startTime;
+ }
+
+ @Override
+ public DateTime now() {
+ return currentHour.plus(System.currentTimeMillis() - startTime);
+ }
+
+ @Override
+ public long nowInMillis() {
+ return now().getMillis();
+ }
+ }
+
+ private final Log log = LogFactory.getLog(MetricsPerfTests.class);
+
+ private MetricsServerStub metricsServer;
+
+ private final int NUM_SCHEDULES = 1000;
+
+ private RateLimiter writePermits;
+ private RateLimiter readPermits;
+
+ @BeforeClass
+ public void setupClass() throws Exception {
+ purgeDB();
+ log.info("Sleeping while table truncation completes...");
+ Thread.sleep(3000);
+ metricsServer = new MetricsServerStub();
+ metricsServer.setConfiguration(configuration);
+ metricsServer.setDAO(dao);
+ metricsServer.setDateTimeService(dateTimeService);
+ writePermits = metricsServer.getWritePermits();
+ readPermits = metricsServer.getReadPermits();
+ }
+
+ private void resetRateLimits() {
+ metricsServer.setWritePermits(writePermits);
+ metricsServer.setReadPermits(readPermits);
+ }
+
+ @Test
+ public void insertRawData() throws Exception {
+ Random random = new Random();
+ DateTime currentHour = hour(3);
+ metricsServer.setWritePermits(RateLimiter.create(3500));
+ metricsServer.setCurrentHour(currentHour);
+ Set<MeasurementDataNumeric> data = new HashSet<MeasurementDataNumeric>();
+ for (int i = 0; i < NUM_SCHEDULES; ++i) {
+ DateTime time = currentHour;
+ for (int j = 0; j < 120; ++j) {
+ data.add(new MeasurementDataNumeric(time.getMillis(), i, random.nextDouble()));
+ time = time.plusSeconds(30);
+ }
+ }
+ WaitForRawInserts waitForRawInserts = new WaitForRawInserts(data.size());
+ metricsServer.addNumericData(data, waitForRawInserts);
+ waitForRawInserts.await("Failed to add raw data");
+ }
+
+ //@Test(dependsOnMethods = "insertRawData")
+ public void queryRawDataAsync() throws Exception {
+ RateLimiter readPermits = RateLimiter.create(50);
+
+ log.info("Running queryRawDataAsync");
+ long start = System.currentTimeMillis();
+
+ DateTime startTime = hour(3).minusHours(1).minusSeconds(1);
+ DateTime endTime = hour(3);
+ final CountDownLatch rawDataArrival = new CountDownLatch(100);
+ final RawNumericMetricMapper mapper = new RawNumericMetricMapper();
+ final Map<Integer, List<RawNumericMetric>> rawDataMap =
+ new ConcurrentHashMap<Integer, List<RawNumericMetric>>(100);
+
+ for (int i = 0; i < NUM_SCHEDULES; ++i) {
+ final int scheduleId = i;
+// readPermits.acquire();
+ StorageResultSetFuture rawDataFuture = dao.findRawMetricsAsync(scheduleId, startTime.getMillis(),
+ endTime.getMillis());
+ Futures.addCallback(rawDataFuture, new FutureCallback<ResultSet>() {
+ @Override
+ public void onSuccess(ResultSet result) {
+ List<RawNumericMetric> rawData = mapper.mapAll(result);
+ rawDataMap.put(scheduleId, rawData);
+ rawDataArrival.countDown();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ log.warn("Failed to retrieve raw data for schedule id " + scheduleId, t);
+ }
+ });
+ }
+
+ rawDataArrival.await();
+ log.info("Finished raw data aysnc query in " + (System.currentTimeMillis() - start) + " ms");
+ }
+
+ //@Test(dependsOnMethods = "insertRawData")
+ public void queryDataSync() throws Exception {
+ log.info("Running queryDataSync");
+
+ long start = System.currentTimeMillis();
+ DateTime startTime = hour(3).minusHours(1).minusSeconds(1);
+ DateTime endTime = hour(3);
+ RawNumericMetricMapper mapper = new RawNumericMetricMapper();
+ Map<Integer, List<RawNumericMetric>> rawDataMp = new HashMap<Integer, List<RawNumericMetric>>(100);
+
+ for (int i = 0; i < NUM_SCHEDULES; ++i) {
+ ResultSet resultSet = dao.findRawMetricsSync(i, startTime.getMillis(), endTime.getMillis());
+ rawDataMp.put(i, mapper.mapAll(resultSet));
+ }
+
+ log.info("Finished raw data sync query in " + (System.currentTimeMillis() - start) + " ms");
+ }
+
+ @Test(dependsOnMethods = "insertRawData")
+ public void runAggregation() {
+ log.info("Running aggregation");
+
+ resetRateLimits();
+
+ long start = System.currentTimeMillis();
+ DateTime currentHour = hour(4);
+ metricsServer.setCurrentHour(currentHour);
+ metricsServer.setAggregationBatchSize(250);
+ metricsServer.setUseAsyncAggregation(false);
+ metricsServer.setDateTimeService(new DateTimeServiceStub(hour(4), start));
+ Collection<AggregateNumericMetric> oneHourData =
+ (Collection<AggregateNumericMetric>) metricsServer.calculateAggregates();
+
+ log.info("Finished computing " + oneHourData.size() + " one hour aggregates in " +
+ (System.currentTimeMillis() - start) + " ms");
+ }
+
+}
diff --git a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsServerTest.java b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsServerTest.java
index 95ada4c..b3ffb66 100644
--- a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsServerTest.java
+++ b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsServerTest.java
@@ -31,7 +31,6 @@ import static org.rhq.test.AssertUtils.assertCollectionMatchesNoOrder;
import static org.rhq.test.AssertUtils.assertPropertiesMatch;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
import java.math.BigDecimal;
import java.math.MathContext;
@@ -40,10 +39,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
@@ -1011,39 +1008,4 @@ public class MetricsServerTest extends CassandraIntegrationTest {
return new SimplePagedResult<RawNumericMetric>(cql, new RawNumericMetricMapper(true), storageSession);
}
- private static class WaitForRawInserts implements RawDataInsertedCallback {
-
- private final Log log = LogFactory.getLog(WaitForRawInserts.class);
-
- private CountDownLatch latch;
-
- private Throwable throwable;
-
- public WaitForRawInserts(int numInserts) {
- latch = new CountDownLatch(numInserts);
- }
-
- @Override
- public void onFinish() {
- }
-
- @Override
- public void onSuccess(MeasurementDataNumeric measurementDataNumeric) {
- latch.countDown();
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- latch.countDown();
- this.throwable = throwable;
- log.error("An async operation failed", throwable);
- }
-
- public void await(String errorMsg) throws InterruptedException {
- latch.await();
- if (throwable != null) {
- fail(errorMsg, Throwables.getRootCause(throwable));
- }
- }
- }
}
diff --git a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsTest.java b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsTest.java
new file mode 100644
index 0000000..59e0b77
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsTest.java
@@ -0,0 +1,138 @@
+package org.rhq.server.metrics;
+
+import static java.util.Arrays.asList;
+import static org.rhq.test.AssertUtils.assertCollectionMatchesNoOrder;
+import static org.testng.Assert.assertEquals;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+import org.joda.time.DateTime;
+import org.testng.annotations.BeforeClass;
+
+import org.rhq.server.metrics.domain.AggregateNumericMetric;
+import org.rhq.server.metrics.domain.MetricsIndexEntry;
+import org.rhq.server.metrics.domain.MetricsTable;
+
+/**
+ * @author John Sanda
+ */
+public class MetricsTest extends CassandraIntegrationTest {
+
+ private static final double TEST_PRECISION = Math.pow(10, -9);
+
+ protected MetricsDAO dao;
+ protected MetricsConfiguration configuration = new MetricsConfiguration();
+ protected DateTimeService dateTimeService;
+
+ @BeforeClass
+ public void initClass() throws Exception {
+ dao = new MetricsDAO(storageSession, configuration);
+ dateTimeService = new DateTimeService();
+ dateTimeService.setConfiguration(configuration);
+ }
+
+ protected DateTime hour(int hourOfDay) {
+ return hour0().plusHours(hourOfDay);
+ }
+
+ protected double avg(double... values) {
+ double sum = 0;
+ for (double value : values) {
+ sum += value;
+ }
+ return divide(sum, values.length);
+ }
+
+ protected double divide(double dividend, int divisor) {
+ return new BigDecimal(Double.toString(dividend)).divide(new BigDecimal(Integer.toString(divisor)),
+ MathContext.DECIMAL64).doubleValue();
+ }
+
+ protected void purgeDB() {
+ session.execute("TRUNCATE " + MetricsTable.RAW);
+ session.execute("TRUNCATE " + MetricsTable.ONE_HOUR);
+ session.execute("TRUNCATE " + MetricsTable.SIX_HOUR);
+ session.execute("TRUNCATE " + MetricsTable.TWENTY_FOUR_HOUR);
+ session.execute("TRUNCATE " + MetricsTable.INDEX);
+ }
+
+ protected void assert1HourDataEquals(int scheduleId, AggregateNumericMetric... expected) {
+ assert1HourDataEquals(scheduleId, asList(expected));
+ }
+
+ protected void assert1HourDataEquals(int scheduleId, Collection<AggregateNumericMetric> expected) {
+ assert1HourDataEquals(scheduleId, new ArrayList<AggregateNumericMetric>(expected));
+ }
+
+ protected void assert1HourDataEquals(int scheduleId, List<AggregateNumericMetric> expected) {
+ assertMetricDataEquals(MetricsTable.ONE_HOUR, scheduleId, expected);
+ }
+
+ protected void assert6HourDataEquals(int scheduleId, AggregateNumericMetric... expected) {
+ assert6HourDataEquals(scheduleId, asList(expected));
+ }
+
+ protected void assert6HourDataEquals(int scheduleId, List<AggregateNumericMetric> expected) {
+ assertMetricDataEquals(MetricsTable.SIX_HOUR, scheduleId, expected);
+ }
+
+ protected void assert24HourDataEquals(int scheduleId, List<AggregateNumericMetric> expected) {
+ assertMetricDataEquals(MetricsTable.TWENTY_FOUR_HOUR, scheduleId, expected);
+ }
+
+ protected void assert24HourDataEquals(int scheduleId, AggregateNumericMetric... expected) {
+ assertMetricDataEquals(MetricsTable.TWENTY_FOUR_HOUR, scheduleId, asList(expected));
+ }
+
+ private void assertMetricDataEquals(MetricsTable columnFamily, int scheduleId,
+ List<AggregateNumericMetric> expected) {
+ List<AggregateNumericMetric> actual = Lists.newArrayList(findAggregateMetrics(columnFamily, scheduleId));
+ assertCollectionMatchesNoOrder("Metric data for schedule id " + scheduleId + " in table " + columnFamily +
+ " does not match expected values", expected, actual, TEST_PRECISION);
+ }
+
+ protected void assertMetricsIndexEquals(MetricsTable table, long timeSlice, List<MetricsIndexEntry> expected,
+ String msg) {
+ List<MetricsIndexEntry> actual = Lists.newArrayList(dao.findMetricsIndexEntries(table, timeSlice));
+ assertCollectionMatchesNoOrder(msg + ": " + table + " index does not match expected values.",
+ expected, actual);
+ }
+
+ protected void assert6HourDataEmpty(int scheduleId) {
+ assertMetricDataEmpty(scheduleId, MetricsTable.SIX_HOUR);
+ }
+
+ protected void assert24HourDataEmpty(int scheduleId) {
+ assertMetricDataEmpty(scheduleId, MetricsTable.TWENTY_FOUR_HOUR);
+ }
+
+ private void assertMetricDataEmpty(int scheduleId, MetricsTable columnFamily) {
+ List<AggregateNumericMetric> metrics = Lists.newArrayList(findAggregateMetrics(columnFamily, scheduleId));
+ assertEquals(metrics.size(), 0, "Expected " + columnFamily + " to be empty for schedule id " + scheduleId +
+ " but found " + metrics);
+ }
+
+ protected void assert1HourMetricsIndexEmpty(DateTime timeSlice) {
+ assertMetricsIndexEmpty(MetricsTable.ONE_HOUR, timeSlice);
+ }
+
+ protected void assert6HourMetricsIndexEmpty(DateTime timeSlice) {
+ assertMetricsIndexEmpty(MetricsTable.SIX_HOUR, timeSlice);
+ }
+
+ protected void assert24HourMetricsIndexEmpty(DateTime timeSlice) {
+ assertMetricsIndexEmpty(MetricsTable.TWENTY_FOUR_HOUR, timeSlice);
+ }
+
+ private void assertMetricsIndexEmpty(MetricsTable table, DateTime timeSlice) {
+ List<MetricsIndexEntry> index = Lists.newArrayList(dao.findMetricsIndexEntries(table, timeSlice.getMillis()));
+ assertEquals(index.size(), 0, "Expected metrics index for " + table + " to be empty but found " + index);
+ }
+
+}
diff --git a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/WaitForRawInserts.java b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/WaitForRawInserts.java
new file mode 100644
index 0000000..5feffda
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/WaitForRawInserts.java
@@ -0,0 +1,51 @@
+package org.rhq.server.metrics;
+
+import static org.testng.Assert.fail;
+
+import java.util.concurrent.CountDownLatch;
+
+import com.google.common.base.Throwables;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.domain.measurement.MeasurementDataNumeric;
+
+/**
+* @author John Sanda
+*/
+class WaitForRawInserts implements RawDataInsertedCallback {
+
+ private final Log log = LogFactory.getLog(WaitForRawInserts.class);
+
+ private CountDownLatch latch;
+
+ private Throwable throwable;
+
+ public WaitForRawInserts(int numInserts) {
+ latch = new CountDownLatch(numInserts);
+ }
+
+ @Override
+ public void onFinish() {
+ }
+
+ @Override
+ public void onSuccess(MeasurementDataNumeric measurementDataNumeric) {
+ latch.countDown();
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ latch.countDown();
+ this.throwable = throwable;
+ log.error("An async operation failed", throwable);
+ }
+
+ public void await(String errorMsg) throws InterruptedException {
+ latch.await();
+ if (throwable != null) {
+ fail(errorMsg, Throwables.getRootCause(throwable));
+ }
+ }
+}
diff --git a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/WaitForWrite.java b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/WaitForWrite.java
new file mode 100644
index 0000000..b67c72d
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/WaitForWrite.java
@@ -0,0 +1,48 @@
+package org.rhq.server.metrics;
+
+import static org.testng.Assert.fail;
+
+import java.util.concurrent.CountDownLatch;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.FutureCallback;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+* @author John Sanda
+*/
+class WaitForWrite implements FutureCallback<ResultSet> {
+
+ private final Log log = LogFactory.getLog(WaitForWrite.class);
+
+ private CountDownLatch latch;
+
+ private Throwable throwable;
+
+ public WaitForWrite(int numResults) {
+ latch = new CountDownLatch(numResults);
+ }
+
+ @Override
+ public void onSuccess(ResultSet rows) {
+ latch.countDown();
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ latch.countDown();
+ this.throwable = throwable;
+ log.error("An async operation failed", throwable);
+ }
+
+ public void await(String errorMsg) throws InterruptedException {
+ latch.await();
+ if (throwable != null) {
+ fail(errorMsg, Throwables.getRootCause(throwable));
+ }
+ }
+
+}
diff --git a/modules/enterprise/server/server-metrics/src/test/resources/log4j.xml b/modules/enterprise/server/server-metrics/src/test/resources/log4j.xml
index d93f284..4ce4d9e 100644
--- a/modules/enterprise/server/server-metrics/src/test/resources/log4j.xml
+++ b/modules/enterprise/server/server-metrics/src/test/resources/log4j.xml
@@ -8,7 +8,7 @@
<appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out" />
- <param name="Threshold" value="WARN" />
+ <param name="Threshold" value="DEBUG" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%-5p %d{dd-MM HH:mm:ss,SSS} (%F:%M:%L) - %m%n" />
</layout>
diff --git a/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/MeasurementAggregator.java b/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/MeasurementAggregator.java
index 7a7c6b9..b5f61f3 100644
--- a/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/MeasurementAggregator.java
+++ b/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/MeasurementAggregator.java
@@ -49,12 +49,15 @@ public class MeasurementAggregator implements Runnable {
private ShutdownManager shutdownManager;
+ private int numSchedules;
+
public MeasurementAggregator(MetricsServer metricsServer, ShutdownManager shutdownManager, Metrics metrics,
- ExecutorService aggregationQueue) {
+ ExecutorService aggregationQueue, int numSchedules) {
this.metricsServer = metricsServer;
this.shutdownManager = shutdownManager;
this.metrics = metrics;
this.aggregationQueue = aggregationQueue;
+ this.numSchedules = numSchedules;
}
public void run() {
@@ -62,6 +65,7 @@ public class MeasurementAggregator implements Runnable {
@Override
public void run() {
Timer.Context context = metrics.totalAggregationTime.time();
+ long start = System.currentTimeMillis();
try {
log.debug("Starting metrics aggregation");
metricsServer.calculateAggregates();
@@ -71,6 +75,7 @@ public class MeasurementAggregator implements Runnable {
shutdownManager.shutdown(1);
} finally {
context.stop();
+ log.debug("Finished metrics aggregation in " + (System.currentTimeMillis() - start) + " ms");
metrics.totalAggregationRuns.inc();
}
}
diff --git a/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/Simulator.java b/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/Simulator.java
index 63bdbea..3e9b4af 100644
--- a/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/Simulator.java
+++ b/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/Simulator.java
@@ -31,6 +31,8 @@ import java.util.concurrent.TimeUnit;
import com.codahale.metrics.ConsoleReporter;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
@@ -65,31 +67,39 @@ public class Simulator implements ShutdownManager {
Metrics metrics = new Metrics();
final ConsoleReporter consoleReporter = createConsoleReporter(metrics, plan.getMetricsReportInterval());
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- shutdown(collectors, "collectors", 5);
- shutdown(readers, "readers", 5);
- shutdown(aggregators, "aggregators", 1);
- shutdown(aggregationQueue, "aggregationQueue", Integer.MAX_VALUE);
- consoleReporter.stop();
- }
- });
-
createSchema(plan.getNodes(), plan.getCqlPort());
Session session = createSession(plan.getNodes(), plan.getCqlPort());
StorageSession storageSession = new StorageSession(session);
MetricsDAO metricsDAO = new MetricsDAO(storageSession, plan.getMetricsServerConfiguration());
- MetricsServer metricsServer = new MetricsServer();
+ final MetricsServer metricsServer = new MetricsServer();
metricsServer.setDAO(metricsDAO);
metricsServer.setConfiguration(plan.getMetricsServerConfiguration());
+ metricsServer.setAggregationBatchSize(plan.getAggregationBatchSize());
+ metricsServer.setUseAsyncAggregation(plan.getAggregationType() == SimulationPlan.AggregationType.ASYNC);
metricsServer.setDateTimeService(plan.getDateTimeService());
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ shutdown(collectors, "collectors", 5);
+ shutdown(readers, "readers", 5);
+ shutdown(aggregators, "aggregators", 1);
+ shutdown(aggregationQueue, "aggregationQueue", Integer.MAX_VALUE);
+ metricsServer.shutdown();
+ log.info("Wait for console reporter...");
+ try {
+ Thread.sleep(181000);
+ } catch (InterruptedException e) {
+ }
+ consoleReporter.stop();
+ }
+ });
+
MeasurementAggregator measurementAggregator = new MeasurementAggregator(metricsServer, this, metrics,
- aggregationQueue);
+ aggregationQueue, plan.getNumMeasurementCollectors() * plan.getBatchSize());
for (int i = 0; i < plan.getNumMeasurementCollectors(); ++i) {
collectors.scheduleAtFixedRate(new MeasurementCollector(plan.getBatchSize(),
@@ -169,6 +179,11 @@ public class Simulator implements ShutdownManager {
Cluster cluster = new ClusterBuilder().addContactPoints(nodes).withPort(cqlPort)
.withCredentials("rhqadmin", "rhqadmin")
.build();
+ PoolingOptions poolingOptions = cluster.getConfiguration().getPoolingOptions();
+ poolingOptions.setCoreConnectionsPerHost(HostDistance.LOCAL, 24);
+ poolingOptions.setCoreConnectionsPerHost(HostDistance.REMOTE, 24);
+ poolingOptions.setMaxConnectionsPerHost(HostDistance.LOCAL, 32);
+ poolingOptions.setMaxConnectionsPerHost(HostDistance.REMOTE, 32);
log.debug("Created cluster object with " + cluster.getConfiguration().getProtocolOptions().getCompression()
+ " compression.");
diff --git a/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/plan/SimulationPlan.java b/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/plan/SimulationPlan.java
index 33b3bbf..7ad50d1 100644
--- a/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/plan/SimulationPlan.java
+++ b/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/plan/SimulationPlan.java
@@ -56,6 +56,26 @@ public class SimulationPlan {
}
}
+ public static enum AggregationType {
+ SYNC("sync"), ASYNC("async");
+
+ private final String text;
+
+ AggregationType(String text) {
+ this.text = text;
+ }
+
+ public static AggregationType fromText(String text) {
+ if (text.equals("sync")) {
+ return SYNC;
+ }
+ if (text.equals("async")) {
+ return ASYNC;
+ }
+ throw new IllegalArgumentException(text + " is not a valid aggregation type");
+ }
+ }
+
private long collectionInterval;
private long aggregationInterval;
@@ -84,6 +104,10 @@ public class SimulationPlan {
private long simulationRate;
+ private int aggregationBatchSize;
+
+ private AggregationType aggregationType;
+
public DateTimeService getDateTimeService() {
return dateTimeService;
}
@@ -195,4 +219,20 @@ public class SimulationPlan {
public void setSimulationRate(long simulationRate) {
this.simulationRate = simulationRate;
}
+
+ public int getAggregationBatchSize() {
+ return aggregationBatchSize;
+ }
+
+ public void setAggregationBatchSize(int aggregationBatchSize) {
+ this.aggregationBatchSize = aggregationBatchSize;
+ }
+
+ public AggregationType getAggregationType() {
+ return aggregationType;
+ }
+
+ public void setAggregationType(AggregationType aggregationType) {
+ this.aggregationType = aggregationType;
+ }
}
diff --git a/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/plan/SimulationPlanner.java b/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/plan/SimulationPlanner.java
index e601212..ff1d83f 100644
--- a/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/plan/SimulationPlanner.java
+++ b/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/plan/SimulationPlanner.java
@@ -98,6 +98,9 @@ public class SimulationPlanner {
simulation.setNodes(nodes);
simulation.setCqlPort(getInt(root.get("cqlPort"), 9142));
+ simulation.setAggregationBatchSize(getInt(root.get("aggregationBatchSize"), 250));
+ simulation.setAggregationType(SimulationPlan.AggregationType.fromText(getString(root.get("aggregationType"),
+ "sync")));
return simulation;
}