Child pages
  • Consumer Group Example

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: ThreadPoolExecutor#execute is preferable

...

Code Block
    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);


        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);

        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submitexecute(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }

...