This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • Link Registry Design
Skip to end of metadata
Go to start of metadata

WIP

This page is still very much Work In Progress. It should be consider a draft.


Introduction

This page describes the design of a "Link Registry". This refers to "Links" as defined by the AMQP 1.0 specification.

There are a couple of situations in which the broker needs to quickly look up links.

  • When Attaching a Link the uniqueness of the ordered tuple (source-container-id, target-container-id, link-name) must be ensured.
  • When Attaching with a "null" Source or Target the existing Terminus should be easily recovered from the Link name.
  • When resuming a Link.

For this and other purposes the creation of a "Link Registry" is proposed.

Terminology and Responsibilities

The following is all extracted from the AMQP 1.0 specification. Where they disagree the specification is obviously the authoritative source.

  • between a source and a target
  • linkname enough to recover link (2.6.1.)
  • tuple (sourceContainerId, targetContainerId, linkName) MUST be globally unique. => only active on one connection
  • can be stolen by sending second Attach while not suspended (2.6.1)

Terminus (source or target)

  • keep track of "unsettled" delivery-tags & delivery-state
  • source assigns unique "delivery-tag" to delivery attempt
  • can exist without link endpoint ("suspended link")
  • should maintain association with LinkEndpoint if any to do Link recovery/resuming (Rob, thinks this association does not belong in the Terminus. I don't understand why).
  • termini can be associated with new endpoints ("resuming link")
  • has durability 0 (no state), 1 (config), 2 (config + unsettled durable messages)

Link Endpoints (sender or receiver)

  • Interface between terminus and session endpoint
  • Lifetime: deleted when either there is a error, the Session ends, or the Link is closed (not just Detached)
  • maintains additional state 
  • endpoint assigns unique delivery-id to message. (alias for delivery-tag?). this loses meaning (becomes invalid) when resuming a link because it is a new link endpoint
  • no state in the endpoint required to resume a link. this comes from the terminus
  • has "handle" to identify the link on the wire (alias). assigned by Attach. local handle = output handle; remote handle = input handle
  • "attached" if has both handles, otherwise "detached" or "half detached"

Session Endpoint

  • We call this Session_1_0
  • mapping between channel
  • must detach link if terminus is not created/found upon Attach (rejecting link)
  • Establish: Terminus does not exist, Link Endpoint does not exist, handle not assigned, no state
  • Resuming: Terminus exists, LinkEndpoint does not exist, handle not assigned, unsettled state
  • Reattaching: Terminus exists, LinkEndpoint exists, handle not assigned, no state
  • Recovering: Terminus exists, endpoint?, handle?, state?

 

Further Considerations

Thread Safety

The Link Registry will be used across multiple connections and thus has to be thread-safe!

Especially Link Stealing seems dangerous. When stealing Link "foo" the Attaching Link (on IO-Thread-A) will have to wait for the Detach of the previously Attached Link being sent and processed by the broker (on IO-Thread-B). If at the same time Link "bar" is stolen from IO-Thread-A by IO-Thread-B there could be a deadlock if we are not careful. (Potentially use ConfigThread as serialising/coordiantor? bottleneck?)

Input Validation

The Link Registry will use user supplied values (link names, container ids). This information can come from both the config and over the wire. We have to make sure we validate both.

Specification Compliance

There are a couple of related sections in the AMQP 1.0 specification that we have to adhere to:

  • receiver must discard messages if considered settled and resumed delivery flag set
  • receiver must consider that the sender considers settled as settled
  • it is possible that resuming does not succeeds due to incomplete unsettled maps (2.6.13.)
  • when resuming a link we must include the unsettled state
  • must be able to do null-source lookup (null-target lookup?)

Requirements

  • recover link from tuple (sourceContainerId, targetContainerId, linkName)
  • recover unsettled state
  • high frequency & low latency update of unsettled state (JSON fail!)
  • handle different DeliveryStates (AMQP 1.0 section 3.4)

Remembering Remote Terminus

Do we have to persist the remote Terminus? In the normal broker operation this seems unnecessary but when thinking about Broker federation we might want to remember for example whether the remote Terminus is a Queue or a Topic to validate upon resuming the Link.

Personally, I (Lorenz) think this is not necessary but if possible we should keep the design flexible enough to allow for a future extension.

Storage

  • Persistence of Termini and persistence of deliveryState/unsettled state should be separated. Persistence of unsettled state is out of scope for this document.
  • A unsettledStore should support different levels of capability in alignment with the TerminusDurability.
    For example a Memory VH should not advertise any TerminusDurability, a JSON VirtualHost might be able to persist the configuration but not the settled state, while a BDB VH might support all three terminus durability modes.
    For now we do not provide TerminusDurability=2 
  • This is another example of where we should differentiate between ConfigurableObjects and PersistentObjects which seems to be orthogonal concepts. LinkRegistry should be Persistent but not Configurable.
  • Therefore, to not further make bad things worse, separate LinkRegistryStore out into distinct store? Same type as ConfigStore?
  • Store needs to preplicated in HA case. Therefore, each VH needs its own store.
    • JSON:

      {
        "version" : "1.0",
        "sendingLinks" : {
          remoteContainerId1 : {
            linkName1 : {
              "source" : {
                /* source attributes */
              }
            },
            linkName2 : {
              "source" : {
                /* source attributes */
              }
            },
            ...
          },
          remoteContainerId2 : {
            linkName1 : {
              "source" : {
                /* source attributes */
              }
            },
            ...
          },
          ...
        }
        "receivingLinks" : {
          remoteContainerId1 : {
            linkName1 : {
              "target" : {
                /* target attributes */
              }
            },
            ...
          },
          ...
        }
      }
    • BDB (needs to be versioned):
      • key: (sourceContainerId, targetContainerId, linkName) 
      • value: (SourceDef, TargetDef)
        where one of the two MAY be null
    • JDBC (needs to be versioned)
      • COLUMNS: (sourceContainerId, targetContainerId, linkName, SourceDef, TargetDef)
        where either SourceDef xor TargetDef MAY be null

Design v1 (obsolete)

Brainstorming

  • As is currently the case each VirtualHost should have its own registry.
  • Both Sessions and Links are abstract immaterial concepts. The concrete objects are the SessionEndpoints and the LinkEndpoints.
    For Sessions we already only have one object (called Session_1_0 to be aligned to the legacy protocols but this really is the SessionEndpoint).
    We should do the same for Link/-Endpoints and  get rid of one of them. Currently the responsibilities are very unclear and there seems to be no value.
  • The implementation of the registry should be hidden behind an API so for example we do not care whether sending links and receiving links are maintained in the same data structure or in separate data structures.
  • We should ensure we discard the LinkEndpoint upon Detach(close=true), Detach(error), or Session#End(). Also it needs to be dissociated from the Terminus.
  • Given the remote container id and the link name a lookup should return a Terminus. If the terminus is currently associated with a LinkEndpoint it should be reachable from the Terminus; if not we should be able to recreate a LinkEndpoint (and thus a Link) dynamically from the Terminus.

Stpes

  1. Merge (Sending-/Receiving-)LinkEndpoint and (Sending-/Receiving-)Link; Remove LinkAttachment
  2. Create Terminus Interface with methods to associate/dissociate a LinkEndpoint and maintain delivery states
  3. Make Source and Target Derive from Terminus and move the (non-durable) delivery state bookkeeping from the LinkEndpoint to the Terminus
  4. Create class for the LinkRegistry.
    1. encapsulate remoteContainer in the registry
    2. values are Termini
  5. replace current Map<containerId, LinkRegistry> on VH with new LinkRegistry

 

In receiveAttach:

Session_1_0#receiveAttach
void receiveAttach(Attach attach) {
  source, target = getAddressSpace().getLinkRegistry().getSendingLink(getConnection().getLocalContainerId(),
                                                                      getConnection().getRemoteContainerId(),
                                                                      attach.getLinkName(),
                                                                      attach.getSource(),
                                                                      attach.getTarget());
  if (source == null) {
    establishLink(attach);
  } else {
    if (attach.getSource() == null) {
      recoverLink(attach, source, target);
    } else {
      if (source.getAssociatedLinkEndpoint() == null) {
        resumeLink(attach, source, target);
      } else {
        if (source.getAssociatedLinkEndpoint().getSession() != this) {
          stealLink(attach, source, target);
        } else {
          reattachLink(attach, source, target);
        }
      }
    }
  }
}
 

 

LinkRegistry API

  • LinkRegistry.putSendingLinkIfAbsent(String localContainerId, String remoteContrainerId, String linkName, Source source, Target target) -> Pair<Source, Target>
  • LinkRegistry.putReceivingLinkIfAbsent(String localContainerId, String remoteContrainerId, String linkName, Source source, Target target) -> Pair<Source, Target>
  • LinkRegistry.getSendvingLink(String localContainerId, String remoteContrainerId, String linkName) -> Pair<Source, Target>
  • LinkRegistry.getReceivingLink(String localContainerId, String remoteContrainerId, String linkName) -> Pair<Source, Target>
  • LinkRegistry.removeSendingLink(String localContainerId, String remoteContrainerId, String linkName) -> Pair<Source, Target>
  • LinkRegistry.removeReceivingLink(String localContainerId, String remoteContrainerId, String linkName) -> Pair<Source, Target>

Problems

  • Link stealing hard
  • association between Source, Target, and  LinkEndpoint not explicit (no Link object). makes attaching logic complecated (see above) and hard to get right (threading, atomicity)

 

Design v2 (obsolete)

Instead of a LinkRegistry we introduce a LinkManager with expanded responsibilities.

Link

The Link encapsulates the Source, Target, and LinkEndpoint and keeps a reference to the associated Session.

API

  • sendFlow
  • sendTransfer
  • sendDisposition
  • receiveFlow
  • receiveTransfer
  • receiveDisposition

The send* methods will most likely end up calling the corresponding methods on the session to do some session specific housekeeping and the actual sending

LinkManager API

  • Link attachLink(Session_1_0 s, Attach a) 
  • void detachLink(Link l, Detach d)

Problems

This design was abandond because it is further away from the spec. v3 tries to remedy this.

 

Design v3

As in the other designs the Terminus will hold persistent state (e.g., Map<delivery-tag, unsettled state>).

We should support a optional and configurable timeout for the Links. If the Links are not Attached for more than the timeout they should be removed from the LinkRegistry.

Also see attached IRC conversation between Rob and Lorenz, especially with regards to Link stealing.

LinkRegistry

The LinkRegistry is responsible for ensuring Link uniqueness and persistence.

API

  • getSendingLink(String remoteContainerId, String linkName) -> Link
    Always returns a non-null link. The session is responsible for checking that the link has valid Source and Targets.
    Needs to be thread-safe (e.g., two calls with the same arguments should return the same object)
  • getReceivingLink(String remoteContainerId, String linkName) -> Link
    See  #getSendingLink()

Non-Public API

The LinkRegistry should make the following functionality available to the Link

  • removeSendingLink(String remoteContainerId, String linkName)
  • removeReceivingLink(String remoteContainerId, String linkName)
  • updateLinkTermini(Link link, Source source, Target target)

Link

The Link encapsulates the Source, Target, and LinkEndpoint and no further state.

API

  • getLinkName() -> String
  • getLocalContainerId() -> String
  • getRemoteContainerId() -> String
  • attach(Session s, Source s, Target t) -> Future<LinkEndpoint>
    This must take care of link stealing (e.g., theifQueue).
    This would probably also communicate with the localContainer to create necessary configuration changes (Queue/Consumer creation, etc.)

LinkEndpoint

The LinkEndpoint is the object that the session and the consumer interact with.

This will have session related state (e.g., Map<delivery-ids, unsettled delivery state>).

API

  • detach()
    Depending on how we implement Link stealing this might or might not have to be asynchronous
  • send-/receive(Flow, Transfer, Disposition)

 

 

  • No labels