A few days ago I came across an interesting article about various approaches to monitoring Java socket traffic:
[JavaSpecialists 169] - Monitoring Sockets
What brought this article to my attention (except of the well known name of
Dr. Heinz M.Kabutz ) was the fact that monitoring Java sockets would be the right task for
BTrace - a tool I happen to be involved with.
Indeed,
Dr.Kabutz does mention
BTrace among the various tools he would use to achieve the objective. Unfortunately, without an example. I took this as a great opportunity to show-off :) and give the audience a working example of monitoring Java sockets using only
BTrace .
package socket;
import com.sun.btrace.AnyType;
import com.sun.btrace.aggregation.Aggregation;
import com.sun.btrace.aggregation.AggregationFunction;
import com.sun.btrace.annotations.*;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import static com.sun.btrace.BTraceUtils.*;
@BTrace
public class SocketMonitor {
private static Map streamMap = newWeakMap(); /** * A helper aggregation instance summing Data-In and Data-Out per socket */ private static Aggregation socketDataSum = newAggregation(AggregationFunction.SUM); @Property /** * dataIn will get published as a JMX property of the btrace/SocketMonitor bean */ private static AtomicLong dataIn = newAtomicLong(0L); @Property /** * dataOut will get published as a JMX property of the btrace/SocketMonitor bean */ private static AtomicLong dataOut = newAtomicLong(0L); @TLS /** * A thread safe helper variable to keep the instance of a Socket * that getInput/OutputStream method is applied to */ private static Socket currentSocket; @TLS /** * A thread safe helper variable to keep the instance of an InputStream * that read method is applied to */ private static InputStream currentInputStream; /** * Intercept the entry to the getInput/OutputStream method call * Store the Socket instance in currentSocket variable */ @OnMethod(clazz = "+java.net.Socket", method = "/get(Input|Output)Stream/", location = @Location(Kind.ENTRY)) public static void onGetStreamEntry(Socket self) { currentSocket = self; } /** * Intercept the normal exit of the getInputStream method call * Upon the exit the instance of the created InputStream is known * so we can bind it with the Socket instance used to obtain the stream */ @OnMethod(clazz = "+java.net.Socket", method = "getInputStream", location = @Location(Kind.RETURN)) public static void onInputStream(InputStream stream) { put(streamMap, stream, currentSocket); currentSocket = null; } /** * Intercept the normal exit of the getOutputStream method call * Upon the exit the instance of the created OutputStream is known * so we can bind it with the Socket instance used to obtain the stream */ @OnMethod(clazz = "+java.net.Socket", method = "getOutputStream", location = @Location(Kind.RETURN)) public static void onOutputStream(OutputStream stream) { put(streamMap, stream, currentSocket); currentSocket = null; } /** * Store the InputStream instance used in the read method call */ @OnMethod(clazz = "+java.io.InputStream", method = "read", location = @Location(Kind.ENTRY)) public static void onRead(AnyType[] args) { InputStream self = (InputStream) args[0]; if (containsKey(streamMap, self)) { currentInputStream = self; } else { currentInputStream = null; } } /** * Use the stored InputStream instance to get hold of the defining Socket instance. * Then use the byte count available as the result of the method call to update * the aggregation and total values */ @OnMethod(clazz = "+java.io.InputStream", method = "read", location = @Location(Kind.RETURN)) public static void countReadData(int count) { if (count > -1 && currentInputStream != null) { Socket sck = get(streamMap, currentInputStream); addAndGet(dataIn, count); addToAggregation(socketDataSum, newAggregationKey(str(sck), "Input"), count); currentInputStream = null; } } /** * The following three methods intercept and process three different forms of the write method call. * The separation is necessary to be able to get hold of strongly typed parameters wchich * we can use in oreder to extract valuable information */ @OnMethod(clazz = "+java.io.OutputStream", method = "write", location = @Location(Kind.ENTRY)) public static void onWrite(Object self, int byte) { if (containsKey(streamMap, self)) { Socket sck = (Socket) get(streamMap, self); addAndGet(dataOut, 1L); addToAggregation(socketDataSum, newAggregationKey(str(sck), "Output"), 1); } } @OnMethod(clazz = "+java.io.OutputStream", method = "write", location = @Location(Kind.ENTRY)) public static void onWrite(Object self, byte[] data) { if (containsKey(streamMap, self)) { Socket sck = (Socket) get(streamMap, self); addAndGet(dataOut, data.length); addToAggregation(socketDataSum, newAggregationKey(str(sck), "Output"), data.length); } } @OnMethod(clazz = "+java.io.OutputStream", method = "write", location = @Location(Kind.ENTRY)) public static void onWrite(Object self, byte[] data, int offset, int length) { if (containsKey(streamMap, self)) { Socket sck = (Socket) get(streamMap, self); addAndGet(dataOut, length); addToAggregation(socketDataSum, newAggregationKey(str(sck), "Output"), length); } } /** * BTrace event handler - when the BTrace engine receives this event * it will dump the aggregation data to the client */ @OnEvent("dump_stats") public static void dumpData() { printAggregation("Summary of the data read by socket", socketDataSum); } /** * BTrace event handler - when received the aggregation data as well as the totals * are reset */ @OnEvent("clear_stats") public static void reset() { println("Resetting collected data..."); clearAggregation(socketDataSum); set(dataIn, 0L); set(dataOut, 0L); println("Data reset"); } }
You can see that with not that much coding you can completely monitor the Java socket subsystem. The code does take advantage of the built-in aggregations in BTrace to summarize the data traffic according to the socket and the direction data moves.
Also, exposing the measured data as a property of JMX bean is just the matter of annotating the accumulator variable with
@Property annotation. Unfortunately, this doesn't work from the aggregations - yet.
I hope you enjoyed this small excursion to BTrace scripting and I hope I will be able to feed you with more of the interesting stuff in the time to come.
No comments:
Post a Comment