The task is quite common in remote messaging systems with failover functionality: e.g. you can miss a few messages when your messaging system is failovered or get stale ones due to some network issues or bugs in your messaging system itself.
Latency checks are OK - just compare operation between 2 timestamps eventually.
The problem is with ordering checks as surely we'll have some race-conditions here but using locks/synchronization will most likely kill performance and scalability.
So we need some kind of non-blocking algorithm - sure CAS atomics will help us.
Let's specify the requirements, we need:
-track for missed messages (when get forward ones)
-ignore the stale messages (when get old)
public class OrderedNonBlockingProcessor implements YetAnotherMessageHandler {
private static Logger log = Logger.getLogger(OrderedNonBlockingProcessor.class);
private final AtomicLong messageCounter = new AtomicLong(0);
@Override
public boolean onSomeHighThroughputMessage(final YetAnotherMessage message) {
final long expected = messageCounter.getAndSet(message.getCounter()) + 1;
if (expected == message.getCounter()) {
processBusinessStuff(message);
return true;
} else if (expected > message.getCounter()) { // stale message
/* wrong message, attempt to restore the sequence to prevent an error on next good message
* TODO: fix ABA problem here
*/
messageCounter.compareAndSet(message.getCounter(), expected - 1);
log.error(String.format("messaging system ordering bug: got stale message %s while expected %s!",
message.getCounter(), expected));
// some other notifying stuff...
} else if (expected < message.getCounter()) { // missed messages
log.error(String.format("got forward message %s while expected %s, missed: %s",
message.getCounter(), expected, message.getCounter() - expected));
// some other notifying stuff...
}
return false;
}
private void processBusinessStuff(YetAnotherMessage message) {
log.info(String.format("process message %s", message.getCounter()));
// some business logic...
}
}
so we have an abstract message with Counter field for order, use CAS to track it and notify when there are missed or stale messages.
Looks good and works for 99.99% time, but can fail sometimes with mess in ordering due to ABA problem in 17 line. Consider message sequence 1,2,3,4,2 and we updated messageCounter to 4 then to2 and going to execute 17 line in one thread (A).
At the same moment 2 more messages arrived: 5 (correct) and 2 (wrong again) and another threads updates the messageCounter to 5 with missed message notification (first false notification) and then to 2, and only after that the thread A executes 17 line and as it successfully finds messageCounter=2 it atomically updates it back to 4. Thus when next correct message 6 comes - we say that message 5 was missed (second false notification) while we already processed it successfully.
Let's fix it, there is a cool AtomicStampedReference class in jdk against ABA issues, but unfortunately it hasn't got an getAndSet method (
We have to implement it ourself it's not very complex by analogue with AtomicLong:
private StampedReferencePairPub getAndSetMessageCounter(final long newValue) {
while (true) {
StampedReferencePairPub current = new StampedReferencePairPub(messageCounter.getReference(), messageCounter.getStamp());
if (messageCounter.compareAndSet(current.ref, newValue, current.stamp, current.stamp + 1))
return current;
}
}
...
public static class StampedReferencePairPub {
public final long ref;
public final int stamp;
StampedReferencePairPub(long r, int i) {
ref = r; stamp = i;
}
}
- yes we have to re-declare the same class as ReferenceIntegerPair as it's inner private inside AtomicStampedReference.
+some refactoring stuff and final version is:
public class OrderedNonBlockingProcessor implements YetAnotherMessageHandler {
private static Logger log = Logger.getLogger(OrderedNonBlockingProcessor.class);
private final AtomicStampedReference messageCounter = new AtomicStampedReference((long)0, 0);
@Override
public boolean onSomeHighThroughputMessage(final YetAnotherMessage message) {
final StampedReferencePairPub current = getAndSetMessageCounter(message.getCounter());
final long expectedCounter = current.ref + 1;
if (expectedCounter == message.getCounter()) {
processBusinessStuff(message);
return true;
} else if (expectedCounter > message.getCounter()) {
/* ignore stale message: attempt to restore the sequence to prevent an error on next good message
*/
final int expectedStamp = current.stamp + 1;
boolean restored = messageCounter.compareAndSet(message.getCounter(), current.ref, expectedStamp, expectedStamp + 1);
log.error(String.format("messaging system ordering bug: got stale message %s while expected %s! Sequence restored: %s",
message.getCounter(), expectedCounter, restored));
// some other notifying stuff...
} else if (expectedCounter < message.getCounter()) {
log.error(String.format("got forward message %s while expected %s, probably missed: %s",
message.getCounter(), expectedCounter, message.getCounter() - expectedCounter));
// some other notifying stuff...
}
return false;
}
That's it! Now we can have only one false missed message notification after any stale message in case of bad timings and high concurrency
Quite a vivid example on how simple things get complicated in non-blocking code, consider how complex a fully correct algorithm will be! =)
+Full sources with jnit tests, +SO question
Комментариев нет:
Отправить комментарий