Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Step 2: Issue the OffsetCommitRequest or OffsetFetchRequest to the offset manager

Code Block
// How to commit offsets

        long now = System.currentTimeMillis();
        Map<TopicAndPartition, OffsetAndMetadata> offsets = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>();
        offsets.put(testPartition0, new OffsetAndMetadata(100L, "associated metadata", now));
        offsets.put(testPartition1, new OffsetAndMetadata(200L, "more metadata", now));
        OffsetCommitRequest commitRequest = new OffsetCommitRequest(
                MY_GROUP,
                offsets,
                correlationId++,
                MY_CLIENTID,
                (short) 1 /* version */); // version 1 and above commit to Kafka, version 0 commits to ZooKeeper
        try {
            channel.send(commitRequest.underlying());
            OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(channel.receive().buffer());
            if (commitResponse.hasError()) {
				for (partitionErrorCode: commitResponse.errors().values()) {
				    if (partitionErrorCode == ErrorMapping.OffsetMetadataTooLargeCode()) {
			  	        // You must reduce the size of the metadata if you wish to retry
				    } else if (partitionErrorCode == ErrorMapping.NotCoordinatorForConsumerCode() || partitionErrorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) {
				      channel.disconnect();
                      // Go to step 1 (offset manager has moved) and then retry the commit to the new offset manager
 				    } else {
				        // log and retry the commit
			        }
 	            }
			}
		}
		catch (IOException ioe) {
	        channel.disconnect();
		    // Go to step 1 and then retry the commit
	    }

// How to fetch offsets

		List<TopicAndPartition> partitionList partitions = new ArrayList<TopicAndPartition>();
// populate partitionList with list of partitions for which we want to look up last committed consumer offset
OffsetFetchRequest ofRequest partitions.add(testPartition0);
        OffsetFetchRequest fetchRequest = new OffsetFetchRequest(consumerGroupString, partitionList);
try {
  offsetChannel.send(ofRequest);
  OffsetFetchResponse ofResponse
                MY_GROUP,
                partitions,
                (short) 1 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper
                correlationId,
                MY_CLIENTID);
		try {
	        channel.send(fetchRequest.underlying());
    	    OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(offsetChannelchannel.receive().buffer());
	  Short errorCode      OffsetMetadataAndError result = ofResponsefetchResponse.offsets.values.iterator.next()().get(testPartition0);
    	    short offsetFetchErrorCode = result.error();
            if (errorCodeoffsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) {
    offsetChannel				channel.disconnect();
    				// Go to step 1 (offset manager has moved) and then retry the offset fetch to the new offset manager
  
			} else if (errorCode == ErrorMapping.OffsetsLoadInProgress()) {
			    // retry the offset fetch (after backoff)
  			} else {
	    	    long retrievedOffset = result.offset();
		        String retrievedMetadata = result.metadata();
			}
		}
		catch (IOException ioee) {
  offsetChannel			channel.disconnect();
  			// Go to step 1 and then retry the offset fetch after backoff
		}