Skip to content

Commit

Permalink
Added monitoring sources and channels
Browse files Browse the repository at this point in the history
  • Loading branch information
Aklakan committed Sep 19, 2024
1 parent 6c23e55 commit f137e76
Show file tree
Hide file tree
Showing 15 changed files with 2,366 additions and 67 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package org.aksw.commons.io.input;

import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.Map.Entry;

import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeMap;
import com.google.common.collect.TreeRangeSet;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.stream.JsonWriter;

public class ChannelMonitor {
public class RangeTracker {
Expand Down Expand Up @@ -84,4 +92,20 @@ public synchronized void submitReadStats(long readStartPos, long readEndPos, int
});
}
}

public void dumpJson(OutputStream out) throws IOException {
Gson gson = new GsonBuilder().setPrettyPrinting().create();
try (JsonWriter w = gson.newJsonWriter(new OutputStreamWriter(out))) {
w.beginArray();
for (Entry<Range<Long>, RangeTracker> e : trackedReads.asMapOfRanges().entrySet()) {
w.beginObject();
w.name("offset");
w.value(e.getKey().lowerEndpoint());
w.name("length");
w.value(e.getValue().getMaxReadLength());
w.endObject();
}
w.endArray();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package org.aksw.commons.io.input;

import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.stream.JsonWriter;

public class ChannelMonitor2 {
public class RangeTracker {
protected long totalDurationNanos;
// protected Set<Integer> readLengths;
protected int minReadLength;
protected int maxReadLength;
protected long totalReadLength;
protected long readCount;

public RangeTracker() {
this(Integer.MAX_VALUE, 0, 0, 0, 0);
}

public RangeTracker(int readLength, long totalDurationNanos) {
this(readLength, readLength, readLength, totalDurationNanos, 1);
}

public RangeTracker(int minReadLength, int maxReadLength, long totalReadLength, long totalDurationNanos, long readCount) {
super();
this.totalDurationNanos = totalDurationNanos;
this.minReadLength = minReadLength;
this.maxReadLength = maxReadLength;
this.totalReadLength = totalReadLength;
this.readCount = readCount;
}

public long getTotalDurationNanos() {
return totalDurationNanos;
}

public int getMinReadLength() {
return minReadLength;
}

public long getTotalReadLength() {
return totalReadLength;
}

public int getMaxReadLength() {
return maxReadLength;
}

public long getReadCount() {
return readCount;
}

public void add(RangeTracker contrib) {
this.totalDurationNanos += contrib.totalDurationNanos;
this.maxReadLength = Math.max(this.maxReadLength, contrib.maxReadLength);
this.minReadLength = this.minReadLength == -1
? contrib.minReadLength
: Math.min(this.minReadLength, contrib.minReadLength);
this.totalReadLength += contrib.totalReadLength;
++this.readCount;
}

@Override
protected RangeTracker clone() {
return new RangeTracker(minReadLength, maxReadLength, totalReadLength, totalDurationNanos, readCount);
}
}

protected volatile NavigableMap<Long, RangeTracker> trackedReads = new TreeMap<>();
protected AtomicLong readCounter = new AtomicLong();
protected AtomicLong readAmount = new AtomicLong();

public void addReadAmount(long readAmount) {
this.readAmount.addAndGet(readAmount);
}

public void incReadCounter() {
this.readCounter.addAndGet(1);
}

public long getReadCounter() {
return readCounter.get();
}

public long getReadAmount() {
return readAmount.get();
}

public NavigableMap<Long, RangeTracker> getTrackedReads() {
return trackedReads;
}

public synchronized void submitReadStats(long offset, long readStartPos, long readEndPos, int readLength, long durationNanos) {
if (readLength > 0) { // Skip lengths that are <= 0
RangeTracker tracker = trackedReads.computeIfAbsent(offset, o -> new RangeTracker());
RangeTracker contrib = new RangeTracker(readLength, durationNanos);
tracker.add(contrib);
}
}

public void dumpJson(OutputStream out) throws IOException {
Gson gson = new GsonBuilder().setPrettyPrinting().create();
try (JsonWriter w = gson.newJsonWriter(new OutputStreamWriter(out))) {
w.beginArray();
for (Entry<Long, RangeTracker> e : getTrackedReads().entrySet()) {
w.beginObject();
w.name("offset");
w.value(e.getKey());
w.name("length");
w.value(e.getValue().getTotalReadLength());
w.endObject();
}
w.endArray();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import java.io.IOException;

import com.google.common.collect.Range;

public interface SeekableReadableChannelSource<A>
extends ReadableChannelSource<A>
{
Expand All @@ -15,6 +17,16 @@ default SeekableReadableChannel<A> newReadableChannel(long offset) throws IOExce
return result;
}

@Override
default SeekableReadableChannel<A> newReadableChannel(long start, long end) throws IOException {
throw new UnsupportedOperationException();
}

@Override
default SeekableReadableChannel<A> newReadableChannel(Range<Long> range) throws IOException {
throw new UnsupportedOperationException();
}


// return newReadableChannel(0l);
// }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@
public class SeekableReadableChannelWithMonitor<A, X extends SeekableReadableChannel<A>>
extends SeekableReadableChannelDecoratorBase<A, X>
{
protected ChannelMonitor monitor;
protected ChannelMonitor2 monitor;

public SeekableReadableChannelWithMonitor(X delegate, ChannelMonitor monitor) {
protected long cachedPos;
protected long relativeStart;
protected long readLength;

protected volatile long readCounter = 0;

public SeekableReadableChannelWithMonitor(X delegate, ChannelMonitor2 monitor) {
super(delegate);
this.monitor = Objects.requireNonNull(monitor);
}
Expand All @@ -17,18 +23,24 @@ public SeekableReadableChannelWithMonitor(X delegate, ChannelMonitor monitor) {
public int read(A array, int position, int length) throws IOException {
// Include positioning in the time so that long times may be discovered
long startTimestamp = System.nanoTime();
long startPos = super.position();
int result = super.read(array, position, length);
long endPos = super.position();
long endTimestamp = System.nanoTime();
long duration = endTimestamp - startTimestamp;
monitor.submitReadStats(startPos, endPos, result, duration);

if (result > 0) {
// long endPos = super.position();
long endTimestamp = System.nanoTime();
long duration = endTimestamp - startTimestamp;
long nextStart = relativeStart + result;
// System.out.println(String.format("Read #%d: pos %d len: %d", ++readCounter, cachedPos, result)) ;
monitor.incReadCounter();
monitor.addReadAmount(result);
// monitor.submitReadStats(cachedPos, relativeStart, nextStart, result, duration);
relativeStart = nextStart;
}
return result;
}

@Override
public void position(long pos) throws IOException {
this.cachedPos = pos;
super.position(pos);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.aksw.commons.io.input;

import java.util.Objects;

public class SeekableReadableSourceWithMonitor<A, X extends SeekableReadableChannelSource<A>>
extends SeekableReadableSourceWrapperBase<A, X>
{
protected ChannelMonitor2 channelMonitor;

public SeekableReadableSourceWithMonitor(X delegate) {
this(delegate, new ChannelMonitor2());
}

public ChannelMonitor2 getChannelMonitor() {
return channelMonitor;
}

public SeekableReadableSourceWithMonitor(X delegate,
ChannelMonitor2 channelMonitor) {
super(Objects.requireNonNull(delegate));
this.channelMonitor = Objects.requireNonNull(channelMonitor);
}

@Override
protected SeekableReadableChannel<A> wrap(SeekableReadableChannel<A> delegate) {
return new SeekableReadableChannelWithMonitor<>(delegate, channelMonitor);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.aksw.commons.io.input;

import java.io.IOException;

import org.aksw.commons.io.buffer.array.ArrayOps;

import com.google.common.collect.Range;

public abstract class SeekableReadableSourceWrapperBase<A, X extends SeekableReadableChannelSource<A>>
implements SeekableReadableChannelSource<A>
{
protected X delegate;

public SeekableReadableSourceWrapperBase(X delegate) {
super();
this.delegate = delegate;
}

public X getDelegate() {
return delegate;
}

@Override
public long size() throws IOException {
return getDelegate().size();
}

@Override
public ArrayOps<A> getArrayOps() {
return getDelegate().getArrayOps();
}

@Override
public SeekableReadableChannel<A> newReadableChannel() throws IOException {
return wrap(getDelegate().newReadableChannel());
}

@Override
public SeekableReadableChannel<A> newReadableChannel(long start) throws IOException {
SeekableReadableChannel<A> result = newReadableChannel();
result.position(start);
return result;
// return wrap(getDelegate().newReadableChannel(start));
}

@Override
public SeekableReadableChannel<A> newReadableChannel(long start, long end) throws IOException {
SeekableReadableChannel<A> result = newReadableChannel();
result.position(start);
return result;
// return wrap(getDelegate().newReadableChannel(start, end));
}

@Override
public SeekableReadableChannel<A> newReadableChannel(Range<Long> range) throws IOException {
throw new UnsupportedOperationException();
// SeekableReadableChannel<A> result = newReadableChannel();
// result.position(range.low);
// return result;
// return wrap(getDelegate().newReadableChannel(range));
}

protected abstract SeekableReadableChannel<A> wrap(SeekableReadableChannel<A> delegate);
}

This file was deleted.

Loading

0 comments on commit f137e76

Please sign in to comment.