# language ja
バージョン0.6以降のCassandraは、保持しているデータに対するHadoopのジョブ実行をサポートしています。https://svn.apache.org/repos/asf/cassandra/trunk/contrib/word_count/ にサンプルがあります(Hadoopジョブの_出力_をCassandraに入力するのももちろん可能です)。Cassandraの行または行フラグメント(キーと、カラムの{{SortedMap}}のペア)がMapタスクの入力となります。それは各行からどのカラムをフェッチするかを決めるための SlicePredicate
で指定されます。word_countサンプルでは1つのカラムを各行から選択していますが、その部分の処理は以下のようになっています。
ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY); SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes())); ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);
Javaのコードを記述するのではなくPigのDSLでジョブを実行出来るようにCassandraは{{LoadFunc}}を提供します。https://svn.apache.org/repos/asf/cassandra/trunk/contrib/pig/ にあります。
Cassandraのsplitはロケーションアウェアです(これはHadoopのInputSplitの設計の本質でもあります)。CassandraはデータのsplitとともにロケーションのリストもHadoopに渡します。Hadoopはデータに一番近いインスタンスでジョブを実行するようにスケジューリングします。ということはHadoopインスタンスをCassandraのマシンそれぞれに載せるのがよいでしょう。
0.6.2/0.7より前のリリースでは、ジョブを失敗させる原因となるリソースのリークが確認されています(コネクションが正常にリリースされないことによるリソースリーク)。環境の設定によりますが、この問題に遭遇することがあります。回避策としては、プロセスがオープン可能なファイルディスクリプタの上限を上げることです(linux/bashでは{{ulimit -n 32000}}とする、等)。エラーはHadoopのジョブ側にThriftのTimedOutExceptionとして報告されます。
1つのノードに対してHadoop連携テストを行っていて何らかのエラーが発生した場合、それは正常なことかもしれません。1台のマシンに対して負荷を与えすぎるとタイムアウトエラーが発生します。その場合、同時実行するタスクの数を減らすことで回避することが可能です。
Configuration conf = job.getConfiguration(); conf.setInt("mapred.tasktracker.map.tasks.maximum",1);
また、Cassandraからバッチで読み込む行数を減らすには以下のようにします。
ConfigHelper.setRangeBatchSize(job.getConfiguration(), 1000);