Skip to content

Commit

Permalink
better logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Nov 3, 2023
1 parent 2864dcc commit d9ec92f
Showing 1 changed file with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public static MessageAdapter defaultMessageAdapter(Context<Member> context, Dige
}

public void clearBuffer() {
log.warn("Clearing message buffer on: {}", member);
log.warn("Clearing message buffer on: {}", member.getId());
buffer.clear();
}

Expand All @@ -147,7 +147,6 @@ public void publish(Message message, boolean notifyLocal) {
if (!started.get()) {
return;
}
log.debug("publishing message on: {}", member.getId());
AgedMessage m = buffer.send(Any.pack(message), member);
if (notifyLocal) {
deliver(Collections.singletonList(
Expand Down Expand Up @@ -201,7 +200,7 @@ private void deliver(List<Msg> newMsgs) {
if (newMsgs.isEmpty()) {
return;
}
log.debug("Delivering: {} msgs for context: {} on: {} ", newMsgs.size(), context.getId(), member.getId());
log.debug("delivering: {} on: {}", newMsgs.size(), member.getId());
channelHandlers.values().forEach(handler -> {
try {
handler.message(context.getId(), newMsgs);
Expand Down Expand Up @@ -428,7 +427,7 @@ public void receive(List<AgedMessage> messages) {
if (messages.size() == 0) {
return;
}
log.trace("receiving: {} msgs on: {}", messages.size(), member);
log.trace("receiving: {} msgs on: {}", messages.size(), member.getId());
deliver(messages.stream()
.limit(params.maxMessages)
.map(am -> new state(adapter.hasher.apply(am.getContent()), AgedMessage.newBuilder(am)))
Expand All @@ -451,7 +450,7 @@ public Iterable<? extends AgedMessage> reconcile(BloomFilter<Digest> biff, Diges
.forEach(s -> mailBox.add(s.msg));
List<AgedMessage> reconciled = mailBox.stream().limit(params.maxMessages).map(b -> b.build()).toList();
if (!reconciled.isEmpty()) {
log.trace("reconciled: {} for: {} on: {}", reconciled.size(), from, member);
log.trace("reconciled: {} for: {} on: {}", reconciled.size(), from, member.getId());
}
return reconciled;
}
Expand All @@ -465,7 +464,7 @@ public AgedMessage send(Any msg, SigningMember member) {
var hash = adapter.hasher.apply(message.getContent());
state s = new state(hash, message);
state.put(hash, s);
log.trace("Send message:{} on: {}", hash, member);
log.trace("Send message:{} on: {}", hash, member.getId());
return s.msg.build();
}

Expand All @@ -477,21 +476,24 @@ public void tick() {
round.incrementAndGet();
if (!tickGate.tryAcquire()) {
log.trace("Unable to acquire tick gate for: {} tick already in progress on: {}", context.getId(),
member);
member.getId());
return;
}
try {
var trav = state.entrySet().iterator();
int gcd = 0;
while (trav.hasNext()) {
var next = trav.next().getValue();
int age = next.msg.getAge();
if (age >= maxAge) {
trav.remove();
log.trace("GC'ing: {} age: {} > {} on: {}", next.hash, age + 1, maxAge, member.getId());
gcd++;
} else {
next.msg.setAge(age + 1);
}
}
if (gcd != 0)
log.trace("GC'ing: {} on: {}", gcd, member.getId());
} finally {
tickGate.release();
}
Expand All @@ -511,7 +513,7 @@ private boolean dup(state s) {
} else if (previous.msg.getAge() != nextAge) {
previous.msg().setAge(nextAge);
}
log.trace("duplicate event: {} on: {}", s.hash, member.getId());
// log.trace("duplicate event: {} on: {}", s.hash, member.getId());
return true;
}
return delivered.contains(s.hash);
Expand All @@ -530,7 +532,7 @@ private void gc() {
purgeTheAged();
if (buffer.size() > params.bufferSize) {
log.warn("Buffer overflow: {} > {} after compact for: {} on: {} ", buffer.size(), params.bufferSize,
context.getId(), member);
context.getId(), member.getId());
}
int freed = startSize - state.size();
if (freed > 0) {
Expand All @@ -542,7 +544,6 @@ private void gc() {
}

private void purgeTheAged() {
log.debug("Purging the aged of: {} buffer size: {} on: {}", context.getId(), size(), member.getId());
Queue<state> candidates = new PriorityQueue<>(
Collections.reverseOrder((a, b) -> Integer.compare(a.msg.getAge(), b.msg.getAge())));
candidates.addAll(state.values());
Expand All @@ -551,7 +552,6 @@ private void purgeTheAged() {
var m = processing.next();
if (m.msg.getAge() > maxAge) {
state.remove(m.hash);
log.trace("GC'ing: {} age: {} > {} on: {}", m.hash, m.msg.getAge() + 1, maxAge, member.getId());
} else {
break;
}
Expand Down

0 comments on commit d9ec92f

Please sign in to comment.