Qpid broker-j  can be embedded into another Java application. This is an especially attractive option for functional tests where dependency starting a stand alone broker is undesired. This page provides instructions how to do so using Qpid Broker-J 7.0 components.

The goal will be to keep the broker in memory without the need for a writable disk. That means that a couple of features (message persistence, flow-to-disk, ...) will not be available. Adapting these instructions should be easy enough.

Configuring Dependencies

The configuration in this HowTo requires the following dependencies (in maven's pom.xml format):

Broker dependencies
<dependencies>
  <!-- Note that the below is qpid-broker-core and *not* qpid-broker. -->
  <dependency>
    <groupId>org.apache.qpid</groupId>
    <artifactId>qpid-broker-core</artifactId>
    <version>7.0.0</version>
  </dependency>
  <!-- AMQP protocol support is modular. Here we show support for 0-8/0-9 and 1.0. Support for 0-10 is excluded. -->
  <dependency>
    <groupId>org.apache.qpid</groupId>
    <artifactId>qpid-broker-plugins-amqp-0-8-protocol</artifactId>
    <version>7.0.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.qpid</groupId>
    <artifactId>qpid-broker-plugins-amqp-1-0-protocol</artifactId>
    <version>7.0.0</version>
  </dependency>
  <!-- If a different store type is required it needs to be included here -->
  <dependency>
    <groupId>org.apache.qpid</groupId>
    <artifactId>qpid-broker-plugins-memory-store</artifactId>
    <version>7.0.0</version>
  </dependency>
</dependencies>

The support for AMQP versions in Qpid Broker-J is modular. The following gavs are required for the different protocol versions:

  • For AMQP 0-8, 0-9, and 0-9-1:
    org.apache.qpid:qpid-broker-plugins-amqp-0-8-protocol:7.0.0

  • For AMQP 0-10:
    org.apache.qpid:qpid-broker-plugins-amqp-0-10-protocol:7.0.0
  • For AMQP 1.0:
    org.apache.qpid:qpid-broker-plugins-amqp-1-0-protocol:7.0.0

Providing a minimal config file

The broker contains an initial configuration that we want to replace for the embedded case. The initial configuration below declares only broker itself, a Scram SHA256 authentication provider with single use entry for user name "admin" having password "admin", amqp port where authentication provider is set to Scram SHA256 authentication provider and Memory Virtual host. This initial configuration should be placed into the resource folder of the java module.

Minimal test configuration (test-initial-config.json)
{
  "name": "Embedded Broker",
  "modelVersion": "7.0",
  "authenticationproviders" : [ {
    "name" : "Scram",
    "type" : "SCRAM-SHA-256",
    "users" : [ {
      "name" : "admin",
      "type" : "managed",
      "password" : "RlZheCx8NY5fmWpshDA2P1gULzLPA8GtoCe5uJgr4ms=,,mnHeeEsCki4ql4slKwooAncju7cL+5QLSxhUMzmvMJw=,z95S+Lmz5v5mESAmov1mJZN17310oMuAPyJIWtg4gBo=,4096"
    } ]
  } ],
  "ports" : [  {
    "name" : "AMQP",
    "port" : "${qpid.amqp_port}",
    "protocols": [ "AMQP_0_9", "AMQP_0_9_1", "AMQP_1_0" ],
    "authenticationProvider" : "Scram",
    "virtualhostaliases" : [ {
      "name" : "nameAlias",
      "type" : "nameAlias"
    }, {
      "name" : "defaultAlias",
      "type" : "defaultAlias"
    }, {
      "name" : "hostnameAlias",
      "type" : "hostnameAlias"
    } ]
  }],
  "virtualhostnodes" : [ {
    "name" : "default",
    "type" : "Memory",
    "defaultVirtualHostNode" : "true",
    "virtualHostInitialConfiguration" : "{\"type\": \"Memory\", \"nodeAutoCreationPolicies\": [{\"patterns\":\".*\",\"createdOnPublish\":\"true\",\"createdOnConsume\":\"true\",\"nodeType\":\"queue\",\"attributes\":{}}] }"
  }]
}

This shows the Port supporting AMQP versions 0-9, 0-9-1 and 1.0. If support for other protocols is required remember to include the dependency in the pom.xml.

Starting the Broker

The following code shows how to start the broker by creating and starting a SystemLauncher:

Start Broker, perform messaging operations, close broker and release system resources
import java.net.URL;
import java.util.HashMap;
import java.util.Map;

import org.apache.qpid.server.SystemLauncher;

public class EmbeddedBroker
{
    private static final String INITIAL_CONFIGURATION = "test-initial-config.json";

    public static void main(String args[]) {
        EmbeddedBroker broker = new EmbeddedBroker();
        try {
            broker.start();
        }
        catch (Exception e)
        {
            e.printStackTrace();
            System.exit(1);
        }
    }

    private void start() throws Exception {
        final SystemLauncher systemLauncher = new SystemLauncher();
        try {
            systemLauncher.startup(createSystemConfig());
            // performMessagingOperations();
        } finally {
            systemLauncher.shutdown();
        }
    }

    private Map<String, Object> createSystemConfig() {
        Map<String, Object> attributes = new HashMap<>();
        URL initialConfig = EmbeddedBroker.class.getClassLoader().getResource(INITIAL_CONFIGURATION);
        attributes.put("type", "Memory");
        attributes.put("initialConfigurationLocation", initialConfig.toExternalForm());
        attributes.put("startupLoggedToSystemOut", true);
        return attributes;
    }
}

Final Remarks

Note

Please note, that provided code samples do not start broker with http management. If http management is required, the http port and http-management plugin need to be added into initial configuration. A maven dependency for qpid-broker-plugins-management-http needs to be added into project as well.

  • No labels