Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions app/databrowser/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;

import org.csstudio.trends.databrowser3.Activator;
Expand Down Expand Up @@ -71,7 +70,7 @@
* Instead of directly accessing the archive, ArchiveFetchJob launches
* a WorkerThread for the actual archive access, so that the Job
* can then poll the progress monitor for cancellation and if
* necessary interrupt the WorkerThread which might be 'stuck'
* necessary cancel the archive reader when it is 'stuck'
* in a long running operation.
*/
class WorkerThread implements Runnable
Expand All @@ -80,7 +79,7 @@
private volatile boolean cancelled = false;

/** Archive reader that's currently queried */
private AtomicReference<ArchiveReader> reader = new AtomicReference<>();
private volatile ArchiveReader reader;

/** @return Message that somehow indicates progress */
public String getMessage()
Expand All @@ -92,10 +91,9 @@
public void cancel()
{
cancelled = true;

final ArchiveReader the_reader = reader.get();
if (the_reader != null)
the_reader.cancel();
final ArchiveReader r = reader;
if (r != null)
r.cancel();
}

/** {@inheritDoc} */
Expand All @@ -117,51 +115,37 @@

final Collection<ArchiveDataSource> archives = item.getArchiveDataSources();
final List<ArchiveDataSource> archives_without_channel = new ArrayList<>();
final int bins_final = bins;
int i = 0;
for (ArchiveDataSource archive : archives)
{
if (cancelled)
break;
final String url = archive.getUrl();
// Display "N/total", using '1' for the first sub-archive.
message = MessageFormat.format(Messages.ArchiveFetchDetailFmt,
archive.getName(), ++i, archives.size());

try
(
final ArchiveReader the_reader = ArchiveReaders.createReader(url);
)
{
reader.set(the_reader);
try
(
final ValueIterator value_iter = (item.getRequestType() == RequestType.RAW)
? the_reader.getRawValues(item.getResolvedName(), start, end)
: the_reader.getOptimizedValues(item.getResolvedName(), start, end, bins)
)
final List<VType> fetched = fetchFromSource(archive, bins_final);
if (!cancelled)
{
// Get samples into array
final List<VType> result = new ArrayList<>();
while (value_iter.hasNext())
result.add(value_iter.next());
samples += result.size();
item.mergeArchivedSamples(archive.getName(), result);
}
catch (UnknownChannelException e)
{
// Do not immediately notify about unknown channels. First search for the data in all archive
// sources and only report this kind of errors at the end
archives_without_channel.add(archive);
}
finally
{
reader.set(null);
samples += fetched.size();
item.mergeArchivedSamples(archive.getName(), fetched);
}
}
catch (UnknownChannelException ex)
{
Comment thread
jacomago marked this conversation as resolved.
// Do not immediately notify about unknown channels. First search for the data in all archive
// sources and only report this kind of errors at the end
archives_without_channel.add(archive);
}
catch (Exception ex)
{ // Tell listener unless it's the result of a 'cancel'?
if (! cancelled)
{
logger.log(Level.WARNING, ex,
() -> "Archive fetch failed for source: " + archive.getName());
if (!cancelled)
listener.archiveFetchFailed(ArchiveFetchJob.this, archive, ex);
// Continue with the next data source
}
}
final long end_time = System.currentTimeMillis();
Expand All @@ -180,6 +164,35 @@
listener.fetchCompleted(ArchiveFetchJob.this);
}

/** Fetch all samples from one archive source.
* Runs directly on WorkerThread, timed by the outer polling loop.
* @return list of samples
* @throws Exception on fetch error
*/
List<VType> fetchFromSource(final ArchiveDataSource archive, final int bins) throws Exception
{
try (final ArchiveReader the_reader = openReader(archive.getUrl()))

Check warning on line 174 in app/databrowser/src/main/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJob.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this local variable to match the regular expression '^[a-z][a-zA-Z0-9]*$'.

See more on https://sonarcloud.io/project/issues?id=ControlSystemStudio_phoebus&issues=AZ5k0Xsoj_7NZQq5cEFq&open=AZ5k0Xsoj_7NZQq5cEFq&pullRequest=3807
{
reader = the_reader;
try
(
final ValueIterator value_iter = (item.getRequestType() == RequestType.RAW)
? the_reader.getRawValues(item.getResolvedName(), start, end)
: the_reader.getOptimizedValues(item.getResolvedName(), start, end, bins)
)
{
final List<VType> result = new ArrayList<>();
while (value_iter.hasNext())
result.add(value_iter.next());
return result;
}
finally
{
reader = null;
}
}
}

@Override
public String toString()
{
Expand All @@ -205,6 +218,25 @@
this.job = JobManager.schedule(toString(), this);
}

/** Test-only constructor: does not schedule via JobManager. */
ArchiveFetchJob(final PVItem item, final Instant start, final Instant end,
final ArchiveFetchJobListener listener, final boolean testOnly)

Check warning on line 223 in app/databrowser/src/main/java/org/csstudio/trends/databrowser3/archive/ArchiveFetchJob.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this unused method parameter "testOnly".

See more on https://sonarcloud.io/project/issues?id=ControlSystemStudio_phoebus&issues=AZ5k0Xsoj_7NZQq5cEFu&open=AZ5k0Xsoj_7NZQq5cEFu&pullRequest=3807
{
this.item = item;
this.start = start;
this.end = end;
this.listener = listener;
this.job = null;
}

/** Create an {@link ArchiveReader} for the given URL.
* Override in tests to inject fakes.
*/
protected ArchiveReader openReader(final String url) throws Exception
{
return ArchiveReaders.createReader(url);
}

/** @return PVItem for which this job was created */
public PVItem getPVItem()
{
Expand Down Expand Up @@ -248,6 +280,8 @@
final Future<?> done = Activator.thread_pool.submit(worker);
// Poll worker and progress monitor
long start = System.currentTimeMillis();
String lastSourceMessage = "";
long sourceStartTime = System.currentTimeMillis();
while (!done.isDone())
{ // Wait until worker is done, or time out to update info message
try
Expand All @@ -258,13 +292,19 @@
{
// Ignore
}
final String currentMessage = worker.getMessage();
if (!currentMessage.equals(lastSourceMessage))
{
lastSourceMessage = currentMessage;
sourceStartTime = System.currentTimeMillis();
}
final long seconds = (System.currentTimeMillis() - start) / 1000;
final String info = MessageFormat.format(Messages.ArchiveFetchProgressFmt,
worker.getMessage(), seconds);
currentMessage, seconds);
monitor.updateTaskName(info);
// Try to cancel the worker in response to user's cancel request.
// Continues to cancel the worker until isDone()
if (monitor.isCanceled())
if (monitor.isCanceled()
|| (Preferences.archive_read_timeout_ms > 0
&& System.currentTimeMillis() - sourceStartTime > Preferences.archive_read_timeout_ms))
worker.cancel();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
/** Setting */
@Preference public static int archive_fetch_delay;
/** Setting */
@Preference public static int archive_read_timeout_ms;

Check warning on line 57 in app/databrowser/src/main/java/org/csstudio/trends/databrowser3/preferences/Preferences.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this field "archive_read_timeout_ms" to match the regular expression '^[a-z][a-zA-Z0-9]*$'.

See more on https://sonarcloud.io/project/issues?id=ControlSystemStudio_phoebus&issues=AZ5k0XyGj_7NZQq5cEFw&open=AZ5k0XyGj_7NZQq5cEFw&pullRequest=3807

Check warning on line 57 in app/databrowser/src/main/java/org/csstudio/trends/databrowser3/preferences/Preferences.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Make this "public static archive_read_timeout_ms" field final

See more on https://sonarcloud.io/project/issues?id=ControlSystemStudio_phoebus&issues=AZ5k0XyGj_7NZQq5cEFv&open=AZ5k0XyGj_7NZQq5cEFv&pullRequest=3807
/** Setting */
@Preference public static int concurrent_requests;
/** Setting */
@Preference public static ArchiveRescale archive_rescale;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ trace_type=AREA
# while interactively zooming and panning
archive_fetch_delay=500

# Per-source read timeout in milliseconds.
#
# Each archive source (appliance, channel archiver, RDB, ...) in the
# WorkerThread fetch loop is given this much time to return data.
# When the source does not respond within the timeout the fetch is
# abandoned, a warning is logged, the failure is reported to the
# listener, and the loop continues with the next source.
#
# A value of 0 disables the timeout (wait forever).
# 30 s covers scalar and waveform PVs on a local/campus network.
# Increase this in settings.ini for WAN deployments.
archive_read_timeout_ms=30000

# Number of concurrent archive fetch requests.
#
# When more requests are necessary, the background jobs
Expand Down
Loading
Loading