Archive for February, 2015

XMPP Packet capture in NoSQL DataBase (BaseX)

BaseX Blog

Introduction

This ‘Blurt/Blog’ details my experiences of a recent telephony & messaging project which made use of the following technologies: BaseX database, XQJ, XQuery, Java & Spring, JAXB, Hibernate, XMPP, Openfire server.

Context

Here at Globility Ltd. we specialise in designing, developing and providing integrated communication services. I recently received an interesting client requirement to capture all XMPP (also called Jabber) packets sent or received as part of a telephony & messaging ecosystem. The packets had to be stored in their original form (for legal reasons). Naturally, the client then requested the facility to search/query this data; with output intelligently summarised into a single line of status indicators (multiple packets making up the lifecycle of a single transaction e.g. multi-party conference call). This requirement would form the heart of the clients missing piece in their audit/compliance system.

From the outset, design decisions were made to ensure the DataLogger solution would be flexible enough to capture many forms of data, and to be easily extended to query it. That would mean it could be used in many situation where the exchange of high-frequency data needs to be captured and queried/analysed.

The Server

An XMPP server would be required to provide basic messaging, presence, and routing services. My experiences with the open source server ‘Openfire’ settled the argument of which one to use (also has a pubsub service XEP-0060: Publish-Subscribe). The Openfire ‘plugin’ architecture allows you to extend and enhance the functionality of the server, so I decided to write a ‘plugin’ (leveraging the ‘out of the box’ XMPP services) which would be the main engine of the overall solution.

Packet Format (XML vs JSON)

The XMPP packets (org.xmpp.packet.Packet) had to be in XML (for existing upstream & downstream systems). JSON would have been the obvious alternative (XEP-0295: JSON Encodings for XMPP). Design decisions were made to accommodate a possible move to JSON in the future (general consensus is a move away from XML towards JSON, for most situations).

Storing Packets

I wanted to store the XMPP XML packets (of type org.xmpp.packet.Message & org.xmpp.packet.IQ) preserving their original form, and narrowed my choice down to 2 open source XML db contenders: BaseX (http://basex.org) & eXist-db (http://exist-db.org).
I settled on BaseX for various reasons:

  1. BSD license (imposing minimal restrictions on the redistribution of covered software)
  2. Allows storage of XML and JSON.
  3. Performs well with large data sets (https://mailman.uni-konstanz.de/pipermail/basex-talk/attachments/20100822/5b4d114d/attachment-0001.pdf).
  4. Has a scalable architecture allowing for future growth
  5. Has a very useful GUI (BaseX) which serves to visualise and administer the data
  6. Flexibility in modes of operation (embedded, standalone, client/server – supporting ACID safe transactions)
  7. Full set of relevant interfacing (XQuery, XQJ – Java API to execute XQuery against an XML db, REST/RESTXQ, WebDav)
  8. Lightweight, so I can have it running locally on my MAC while developing (and an Ubuntu build for production)

A major decision effecting storage size, performance and querying was should I store each packet as a separate XML document or store them into a single giant XML document. I chose the separate document per packet approach after consideration to how the data would be potentially queried.
Another major decision was should I use a standard Java interface or native BaseX API calls to handle XML DataSources. I wanted to be as database-agnostic so opted to use XQJ (http://xqj.net/). The XQJ API is to XML Databases as the JDBC API is to Relational Databases.

DataLogger Plugin

I’ve summarised the design/code of the DataLogger Openfire plugin:

  1. To create the DataLogger Plugin, Implement the org.jivesoftware.openfire.container.Plugin interface, and in the overridden initializePlugin() method, create the DataLoggerComponent that extends org.xmpp.component.AbstractComponent:

 

  1. The DataLogger Plugin (or better coding style, some helper class) has to implement the org.jivesoftware.openfire.interceptor.PacketInterceptor ( and override interceptPacket() method) to store org.xmpp.packet.IQ packets into BaseX:

 

  1. The DataLoggerComponent needs to override appropriate handlers. It must also discover all pubsub nodes on the Openfire server and subscribe to them (in order to catch org.xmpp.packet.Message packets). Finally, it need to register AdHocCommands (basis of the data querying) with the AdHocCommandManager:

@Component
public class DataLoggerComponent extends AbstractComponent implements GtmsComponent {

@Autowired
public DataLoggerComponent(DataLoggerDBParams dataLoggerDBParams) {

}
@Override
public String getDescription() {
return “Data Logger API Component”;
}
@Override
public String getName() {
return “datalogger”;
}
@Override
public String getDomain() {
return XMPPServer.getInstance().getServerInfo().getXMPPDomain();
}
@Override
protected String[] discoInfoFeatureNamespaces() {
return new String[] { DISCO_ITEMS, DATA_FORMS, GlobalConstants. ADHOC_COMMANDS,
DATALOGGER_NAMESPACE };
}
@Autowired
public void setAdHocCommandManager(AdHocCommandManager adHocCommandManager) {
this.adHocCommandManager = adHocCommandManager;
}
@Override
public IQ handleDiscoItems(IQ iq) {

}
@Override
protected IQ handleDiscoInfo(IQ iq) {

}
@Override
public IQ handleIQGet(IQ iq) {
return this.handleIQPacket(iq);
}
@Override
public IQ handleIQSet(IQ iq) {
return this.handleIQPacket(iq);
}
@Override
protected void handleIQError(IQ iq) {
this.log.debug(“IQ ERROR: ” + iq.toXML());
}
@Override
protected void handleIQResult(IQ iq) {
this.log.debug(“IQ RESULT: ” + iq.toXML());
}
private IQ handleIQPacket(IQ iq) {

}
@Override
protected void handleMessage(Message message) {
XQDataSource ds = null;
XQConnection2 xqc = null;
try {
ds = new BaseXXQDataSource();
ds.setProperty(“serverName”, this.dataLoggerDBParams.getServerName());
ds.setProperty(“port”, this.dataLoggerDBParams.getPort());
ds.setProperty(“user”, this.dataLoggerDBParams.getUser());
ds.setProperty(“password”, this.dataLoggerDBParams.getPassword());
ds.setProperty(“databaseName”, this.dataLoggerDBParams.getDatabaseName());
xqc = (XQConnection2)ds.getConnection();
XQItem item = xqc.createItemFromDocument(message.toString(), null, null);

}
class MyIQResultListener implements IQResultListener {
@Override
public void receivedAnswer(IQ iq) {
Element inIq = iq.getChildElement();
if (iq.getType() == IQ.Type.result) {
if (inIq.getNamespaceURI().equals(“http://jabber.org/protocol/disco#items”)) {
allFirstLevelNodes = getNodesFrom(iq);
}
else if (inIq.getNamespaceURI().equals(“http://jabber.org/protocol/pubsub”)) {
Element inPubSub = inIq.element(“subscriptions”);
if (inPubSub != null) {
//this is a result from a get all subscriptions request
allSubscriptions = getSubscriptionsFrom(iq);
}
else {
inPubSub = inIq.element(“subscription”);
if (inPubSub != null) {
//this is a result from a subscription request
String node = inPubSub.attributeValue(“node”);
String subscriptionStatus = inPubSub.attributeValue(“subscription”);
if (subscriptionStatus != null && subscriptionStatus.equals(“subscribed”)) {
allSubscriptions.add(node);
}
}
}
}
}
}
@Override
public void answerTimeout(String packetId) {
log.debug(“IQResultListener timed out, packetId: ” + packetId);
// notificationServiceOnline = false;
}
}
/**
* Extracts a list of Nodes from an IQ packet.
* @param packet the packet
* @return a list of Nodes
*/
private static Collection<String> getNodesFrom(IQ packet) {
Collection<String> nodes = new ArrayList<String>();
Element query = packet.getChildElement();
Iterator<Element> itemsIterator = query.elementIterator(“item”);
if (itemsIterator != null) {
while (itemsIterator.hasNext()) {
Element itemElement = itemsIterator.next();
String node = itemElement.attributeValue(“node”);
nodes.add(node.toString());
}
return nodes;
}
return null;
}
/**
* Extracts a list of Subscriptions from an IQ packet.
* @param packet the packet
* @return a list of Nodes
*/
private static Collection<String> getSubscriptionsFrom(IQ packet) {
Collection<String> subscriptions = new ArrayList<String>();
Element childElement = packet.getChildElement();
Element subscriptionsElement = childElement.element(“subscriptions”);
Iterator<Element> itemsIterator = subscriptionsElement.elementIterator(“subscription”);
if (itemsIterator != null) {
while (itemsIterator.hasNext()) {
Element itemElement = itemsIterator.next();
if (itemElement.attributeValue(“subscription”).equals(“subscribed”)) {
String node = itemElement.attributeValue(“node”);
subscriptions.add(node.toString());
}
}
return subscriptions;
}
return null;
}
@Override
public void postComponentStart() {
// enable profiler
JiveGlobals.setProperty(DataLoggerConstants.Properties.ENABLED, “TRUE”);
this.dataLoggerPubSubManager = new DataLoggerPubSubManager(this);
try {
requestAllPubSubNodes();
requestAllPubSubSubscriptions();
Collection<String> nodesNotSubscribed = difference(this.allFirstLevelNodes, this.allSubscriptions);
requestSubscriptions(nodesNotSubscribed);
this.adHocCommandManager.addCommand(new SearchDataLoggerDBCallSummary(this, this.dataLoggerDBParams));
this.adHocCommandManager.addCommand(new SearchDataLoggerDBMsgSummary(this, this.dataLoggerDBParams));
this.adHocCommandManager.addCommand(new SearchDataLoggerDBVBlastSummary(this, this.dataLoggerDBParams));
} catch (Exception e) {
log.error(GtmsLog.exceptionToString(e));
}
}
private Collection<String> difference(Collection a, Collection b) {
if (b == null || a == null) {
return a;
}
Collection diff = CollectionUtils.subtract(a, b);
return diff;
}
private void requestAllPubSubNodes() {
log.debug(“Requesting all first level nodes known to pubsub service”);
IQ iq1 = PubSubHelper.buildPubSubAllFirstLevelNodes(getJID().toString(), this.getDomain());
IQRouter iqRouter = XMPPServer.getInstance().getIQRouter();
XMPPServer.getInstance().getIQRouter().addIQResultListener(iq1.getID(), new MyIQResultListener());
iqRouter.route(iq1);
log.debug(“Sent request: ” + iq1.toXML());
}
private void requestAllPubSubSubscriptions() {
log.debug(“Requesting all datalogger pubsub subscriptions”);
IQ iq1 = PubSubHelper.buildPubSubAllSubscriptions(getJID().toString(), this.getDomain());
IQRouter iqRouter = XMPPServer.getInstance().getIQRouter();
XMPPServer.getInstance().getIQRouter().addIQResultListener(iq1.getID(), new MyIQResultListener());
iqRouter.route(iq1);
log.debug(“Sent request: ” + iq1.toXML());
}
private void requestSubscriptions(Collection<String> nodes) {
log.debug(“Requesting subscriptions to all first level unsubscribed nodes”);
IQ iq1 = null;
for (String node : nodes) {
iq1 = PubSubHelper.buildPubSubNodeSubscription(getJID().toString(), this.getDomain(), node);
IQRouter iqRouter = XMPPServer.getInstance().getIQRouter();
XMPPServer.getInstance().getIQRouter().addIQResultListener(iq1.getID(), new MyIQResultListener());
iqRouter.route(iq1);
log.debug(“Sent request: ” + iq1.toXML());
}
}
@Override
public void sendPacket(Packet packet) throws ComponentException {
this.compMan.sendPacket(this, packet);
}
@Override
public void postComponentShutdown() {

}

}

  1. In point 3) above, pay close attention to the overridden handleMessage() method. It stores org.xmpp.packet.Message packets into BaseX:

@Override
protected void handleMessage(Message message) {
XQDataSource ds = null;
XQConnection2 xqc = null;
try {
ds = new BaseXXQDataSource();
ds.setProperty(“serverName”, this.dataLoggerDBParams.getServerName());
ds.setProperty(“port”, this.dataLoggerDBParams.getPort());
ds.setProperty(“user”, this.dataLoggerDBParams.getUser());
ds.setProperty(“password”, this.dataLoggerDBParams.getPassword());
ds.setProperty(“databaseName”, this.dataLoggerDBParams.getDatabaseName());
xqc = (XQConnection2)ds.getConnection();
XQItem item = xqc.createItemFromDocument(message.toString(), null, null);
….

  1. Use multi-stage Data Forms (XEP-0004: Data Forms) as a generic way of querying the stored XML data. Any client that wants to query the data, can use them to pass in query criteria and thus, generate customisable queries. This is achieved by extending org.jivesoftware.openfire.commands.AdHocCommand (and overriding the addStageInformation(), execute() methods):

public class SearchDataLoggerDBCallSummary extends DataLoggerAdHocCommand {

@Override
protected void addStageInformation(SessionData data, Element command) {
DataForm form = new DataForm(DataForm.Type.form);
form.setTitle(“DataLogger Search: Call Summary”);
form.addInstruction(“Form to query Datalogger Database to bring back summary of calls.”);
FormField field = form.addField();
field.setType(FormField.Type.hidden);
field.setVariable(“FORM_TYPE”);
field.addValue(DATALOGGER_NAMESPACE);
field = form.addField();
field.setType(FormField.Type.text_single);
field.setLabel(“Profile (User/Group)”);
field.setVariable(“profile”);
field.setDescription(“The User or Group Profile”);
field = form.addField();
field.setType(FormField.Type.text_single);
field.setLabel(“History From (milliseconds)”);
field.setVariable(“timestamp”);
field.setDescription(“Call history from this point in time”);
field.setRequired(true);
field = form.addField();
field.setType(FormField.Type.list_single);
field.setLabel(“Sort by”);
field.setVariable(“sortby”);
field.setDescription(“Sort returned data”);
field.addValue(“number($distinct-id)”);
field.addOption(“Call ID”, “number($distinct-id)”);
field.addOption(“Timestamp”, “number($start-time3)”);
field.addOption(“State”, “$state”);
field.addOption(“Destination”, “number($called[1])”);
field.addOption(“Direction”, “$direction”);
field.addOption(“Source”, “number($caller[1])”);
field.addOption(“Duration”, “number($duration)”);
field.setRequired(true);

f// Add the form to the command
command.add(form.getElement());
}
@Override
public void execute(SessionData data, Element command) {
String profile = getValueFromData(data, “profile”);
String timestamp = getValueFromData(data, “timestamp”);
String sortby = getValueFromData(data, “sortby”);
String ascdesc = getValueFromData(data, “ascdesc”);

XQDataSource ds = null;
XQConnection2 xqc = null;
try {
ds = new BaseXXQDataSource();
ds.setProperty(“serverName”, this.dataLoggerDBParams.getServerName());
ds.setProperty(“port”, this.dataLoggerDBParams.getPort());
ds.setProperty(“user”, this.dataLoggerDBParams.getUser());
ds.setProperty(“password”, this.dataLoggerDBParams.getPassword());
ds.setProperty(“databaseName”, this.dataLoggerDBParams.getDatabaseName());
xqc = (XQConnection2)ds.getConnection();
//This code queries the xml db and prints out results

XQPreparedExpression xqp = xqc.prepareExpression(xqueryString);
// Bind a variable and execute the query
xqp.bindDouble(new QName(“start”), start, null);
xqp.bindDouble(new QName(“max”), max, null);
XQResultSequence rs = xqp.executeQuery();
while(rs.next()) {
String str = rs.getItemAsString(null);

}
}
FormField field = form.addField();
field.setLabel(DATA[8]);
field.setVariable(DATA[8]);
field.setDescription(“The size of returned result set”);
field.addValue(resCount);
xqc.close();
command.add(form.getElement());
note.addAttribute(“type”, “info”);
note.setText(“Operation finished successfully”);
logger.debug(“SearchDataLoggerDBCallSummary: End”);
} catch (Exception e) {
logger.error(GtmsLog.exceptionToString(e));
note.addAttribute(“type”, “error”);
note.setText(e.getMessage());
} finally {
if ((xqc != null) && (!xqc.isClosed())) {
try {
xqc.close();
} catch (Exception ex) {
logger.error(GtmsLog.exceptionToString(ex));
}
}
}
}
@Override
protected List<Action> getActions(SessionData data) {
return Arrays.asList(AdHocCommand.Action.complete);
}
@Override
public String getCode() {
return DataLoggerConstants.DATALOGGER_NAMESPACE + “#search-datalogger-callsummary”;
}
@Override
public String getDefaultLabel() {
return “Search datalogger call summary”;
}
@Override
protected Action getExecuteAction(SessionData data) {
return AdHocCommand.Action.complete;
}
@Override
public int getMaxStages(SessionData data) {
return 1;
}
}

  1. The following is an example of the multi-stage Data Form request & response:

————————————————————————
——————Execute the search command search-datalogger-callsummary (multi-stage)
————————————————————————
<iq from=”…” to=”datalogger…”
type=”set” id=”search1″>
<command xmlns=”http://jabber.org/protocol/commands”
node=”http://gltd.net/protocol/datalogger:01:00:00#search-datalogger-callsummary”
action=”execute” />
</iq>
—————–Server returns a new stream form. Required values are
——-marked and can be of type text-single, boolean, etc.
——-Recommend looking at Data Forms spec to see all possible types
<iq type=”result” id=”search1″ from=”datalogger…” to=”harrytest5@…”>
<command xmlns=”http://jabber.org/protocol/commands” sessionid=”…”
node=”http://gltd.net/protocol/datalogger:01:00:00#search-datalogger-callsummary”
status=”executing”>
<x xmlns=”jabber:x:data” type=”form”>
<title>DataLogger Search: Call Summary</title>
<instructions>Fill out this form to request a search of the
Datalogger Database to bring back a summary of calls.
</instructions>
<field type=”hidden” var=”FORM_TYPE”>
<value>http://gltd.net/protocol/datalogger:01:00:00</value>
</field>
<field type=”text-single” label=”Profile (User/Group)” var=”profile”>
<desc>The User or Group Profile</desc>
</field>
<field type=”text-single” label=”History From (milliseconds)”
var=”timestamp”>
<desc>Call history from this point in time</desc>
<required />
</field>
<field type=”list-single” label=”Sort by” var=”sortby”>
<desc>Sort returned data</desc>
<value>$distinct-id</value>
<option label=”Call ID”>
<value>$distinct-id</value>
</option>
<option label=”Timestamp”>
<value>$start-time3</value>
</option>
<option label=”State”>
<value>$state</value>
</option>
<option label=”Destination”>
<value>$called[1]</value>
</option>
<option label=”Direction”>
<value>$$direction</value>
</option>
<option label=”Source”>
<value>$caller[1]</value>
</option>
<required />
</field> </x>
<actions execute=”complete”>
<complete />
</actions>
</command>
</iq>
The second stage of the multi-stage Data Form would be:
Client Takes above result from 1st stage and produces the ‘set’ request below—-
<iq type=”set” id=”search2″ from=”…” to=”datalogger…”>
<command xmlns=”http://jabber.org/protocol/commands”
node=”http://gltd.net/protocol/datalogger:01:00:00#search-datalogger-callsummary”
sessionid=”…”>
<x xmlns=’jabber:x:data’ type=’submit’>
<field type=”hidden” var=”FORM_TYPE”>
<value>http://gltd.net/protocol/datalogger:01:00:00</value>
</field>
<field type=”text-single” label=”Profile (User/Group)” var=”profile”>
<value>gary_office…</value>
</field>
<field type=”text-single” label=”History From (milliseconds)” var=”timestamp”>
<value>1486790948174</value>
</field>
<field type=”list-single” label=”Sort by” var=”sortby”>
<value>$distinct-id</value>
</field>
</x>
</command>
</iq>
—————–Result from above request
<iq type=”result” id=”search2″ from=”datalogger.sirius.gltd.local”
to=”harrytest5@sirius.gltd.local/Smack”>
<command xmlns=”http://jabber.org/protocol/commands” sessionid=”xJQROWl15FO47te”
node=”http://gltd.net/protocol/datalogger:01:00:00#search-datalogger-callsummary”
status=”completed”>
<note type=”info”>Operation finished successfully</note>
<x xmlns=”jabber:x:data” type=”result”>
<reported>
<field var=”callid” type=”text-single” label=”callid” />
<field var=”count” type=”text-single” label=”count” />
<field var=”state” type=”list-single” label=”state” />
<field var=”direction” type=”text-single” label=”direction” />
<field var=”called” type=”text-single” label=”called” />
<field var=”caller” type=”text-single” label=”caller” />
<field var=”duration” type=”text-single” label=”duration” />
<field var=”timestamp” type=”text-single” label=”timestamp” />
<field var=”resultcount” type=”text-single” label=”resultcount” />
</reported>
<item>
<field var=”timestamp”>
<value>1385646662893</value>
</field>
<field var=”caller”>
<value>6004</value>
</field>
<field var=”duration”>
<value>14</value>
</field>
<field var=”count”>
<value>4</value>
</field>
<field var=”callid”>
<value>1385646172807</value>
</field>
<field var=”direction”>
<value>Outgoing</value>
</field>
<field var=”called”>
<value>770371</value>
</field>
<field var=”state”>
<value>Answered</value>
</field>
</item>
<item>
<field var=”timestamp”>
<value>1385646692313</value>
</field>
<field var=”caller”>
<value>6004</value>
</field>
<field var=”duration”>
<value>6</value>
</field>
<field var=”count”>
<value>4</value>
</field>
<field var=”callid”>
<value>1385646172808</value>
</field>
<field var=”direction”>
<value>Outgoing</value>
</field>
<field var=”called”>
<value>771338</value>
</field>
<field var=”state”>
<value>Answered</value>
</field>
</item>

<field label=”resultcount” var=”resultcount”>
<desc>The size of returned result set</desc>
<value>36</value>
</field>
</x>
</command>
</iq>

  1. A Proof of concept for unmarshalling/marshalling using JAXB was successfully completed. This will be incorporated into a future version of the DataLogger Plugin

NOTE: Belatedly, I realised that packets were being duplicated, as the pubsub Message packets were also coming in as IQ packets. I eventually switched over to using just the PacketInterceptor to capture all packets without duplication.

Development Environment (MacBook Pro)

  1. Eclipse-Kepler (Java 6, Spring, Openfire with HSQL database, Hibernate, BaseX, XQJ, JAXB)
  2. BaseXserver
  3. BaseXGUI (great for testing/running XQuery code, ). NOTE: Ensure you are not logged into basex db through the basexclient or the basexgui, otherwise you will get the following error: java.sql.SQLException: The database is already in use by another process.
  4. Git/BitBucket
  5. Grinder for testing (performance & scalability). Manual testing during development was done using a smack based XMPP client to inject Message & IQ packets
  6. Jenkins – Continuous build/test

Architecture Diagram

 

PromoBar

icoPromoBar

Promobar is now up and running with a 6 nations game. Check it out at http://promobar.gltd.net
A cricket game will soon be released for the WC 2015 before a build up to the Rugby World Cup 2015 later this year.

Promobar is an exciting solution for the delivery of interactive marketing games. Take a look at the solutions page for more information.

OUR BLOG

on July 5, 2018

We have started using medium for our blogging. Please click here to see our latest posts.

on February 9, 2015

BaseX Blog Introduction This 'Blurt/Blog' details my experiences of a recent

on December 4, 2013

iCosts - The Legal Costs Calculator - Demo http://youtu.be/P5pSAZzKi08

on December 4, 2013

Lync XMPP + Video - Demo http://youtu.be/aWpaxFGDwXY

Contact Us

9th Floor Capital House
40-42 Weston Street
London
SE1 3QD

sales@gltd.net 0207 100 1499 Follow us Like us

Specialists in integrated and flexible communications

Copyright © 2013 Globility Limited. All rights reserved.