/** * Copyright (C) 2007-2012 Hypertable, Inc. * * This file is part of Hypertable. * * Hypertable is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 3 * of the License, or any later version. * * Hypertable is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA * 02110-1301, USA. */ package org.hypertable.examples.PerformanceTest; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.logging.Logger; import java.util.Vector; import org.apache.thrift.transport.TTransportException; import org.apache.thrift.TException; import org.hypertable.Common.Checksum; import org.hypertable.thriftgen.*; import org.hypertable.thrift.SerializedCellsFlag; import org.hypertable.thrift.SerializedCellsReader; import org.hypertable.thrift.SerializedCellsWriter; import org.hypertable.thrift.ThriftClient; public class DriverHypertable extends Driver { static final Logger log = Logger.getLogger("org.hypertable.examples.PerformanceTest"); public static final int CLIENT_BUFFER_SIZE = 1024*1024*12; public static final int DEFAULT_THRIFTBROKER_PORT = 38080; public DriverHypertable() throws TTransportException, TException { } public DriverHypertable(int broker_port) { mThriftBrokerPort = broker_port; } protected void finalize() { if (mSetup.parallelism == 0) { try { if (mNamespaceId != 0) mClient.namespace_close(mNamespaceId); } catch (Exception e) { System.out.println("Unable to close namespace - " + mNamespace + e.getMessage()); System.exit(-1); } } } public void setup(Setup setup) { super.setup(setup); if (mSetup.parallelism == 0) { mCellsWriter = new SerializedCellsWriter(CLIENT_BUFFER_SIZE); try { mClient = ThriftClient.create("localhost", mThriftBrokerPort); mNamespace = "/"; mNamespaceId = mClient.namespace_open(mNamespace); } catch (Exception e) { System.out.println("Unable to establish connection to ThriftBroker at localhost:38080 " + "and open namespace '/'- " + e.getMessage()); System.exit(-1); } } try { mCommon.initializeValueData(); if (mSetup.parallelism > 0) { mThreadStates = new DriverThreadState[mSetup.parallelism]; for (int i=0; i 0) { mCellsWriter = new SerializedCellsWriter(mSetup.keySize + family.length + qualifier.length + value.length + 32); if (!mCellsWriter.add(row, 0, mSetup.keySize, family, 0, family.length, qualifier, 0, qualifier.length, SerializedCellsFlag.AUTO_ASSIGN, value, 0, value.length, SerializedCellsFlag.FLAG_INSERT)) { System.out.println("Failed to write to SerializedCellsWriter"); System.exit(-1); } synchronized (mThreadStates[mThreadNext]) { mThreadStates[mThreadNext].updates.add(mCellsWriter); mThreadStates[mThreadNext].notifyAll(); } mThreadNext = (mThreadNext + 1) % mSetup.parallelism; mCellsWriter = null; } else { while (!mCellsWriter.add(row, 0, mSetup.keySize, family, 0, family.length, qualifier, 0, qualifier.length, SerializedCellsFlag.AUTO_ASSIGN, value, 0, value.length, SerializedCellsFlag.FLAG_INSERT)) { mClient.mutator_set_cells_serialized(mMutator, mCellsWriter.buffer(), false); mCellsWriter.clear(); } } } if (mSetup.parallelism == 0) { if (!mCellsWriter.isEmpty()) mClient.mutator_set_cells_serialized(mMutator, mCellsWriter.buffer(), true); else mClient.mutator_flush(mMutator); } } catch (Exception e) { e.printStackTrace(); log.severe(e.toString()); throw new IOException("Unable to set cell via thrift - " + e.toString()); } } else if (mSetup.type == Setup.Type.INCR) { byte [] row = new byte [ mSetup.keySize ]; byte [] family = mCommon.COLUMN_FAMILY_BYTES; byte [] qualifier = mCommon.COLUMN_QUALIFIER_BYTES; byte [] value = mCommon.INCREMENT_VALUE_BYTES; if (mSetup.parallelism == 0) mCellsWriter.clear(); try { for (long i=task.start; i 0) { mCellsWriter = new SerializedCellsWriter(mSetup.keySize + family.length + qualifier.length + value.length + 32); if (!mCellsWriter.add(row, 0, mSetup.keySize, family, 0, family.length, qualifier, 0, qualifier.length, SerializedCellsFlag.AUTO_ASSIGN, value, 0, value.length, SerializedCellsFlag.FLAG_INSERT)) { System.out.println("Failed to write to SerializedCellsWriter"); System.exit(-1); } synchronized (mThreadStates[mThreadNext]) { mThreadStates[mThreadNext].updates.add(mCellsWriter); mThreadStates[mThreadNext].notifyAll(); } mThreadNext = (mThreadNext + 1) % mSetup.parallelism; mCellsWriter = null; } else { while (!mCellsWriter.add(row, 0, mSetup.keySize, family, 0, family.length, qualifier, 0, qualifier.length, SerializedCellsFlag.AUTO_ASSIGN, value, 0, value.length, SerializedCellsFlag.FLAG_INSERT)) { mClient.mutator_set_cells_serialized(mMutator, mCellsWriter.buffer(), false); mCellsWriter.clear(); } } } if (mSetup.parallelism == 0) { if (!mCellsWriter.isEmpty()) mClient.mutator_set_cells_serialized(mMutator, mCellsWriter.buffer(), true); else mClient.mutator_flush(mMutator); } } catch (Exception e) { e.printStackTrace(); log.severe(e.toString()); throw new IOException("Unable to set cell via thrift - " + e.toString()); } } else if (mSetup.type == Setup.Type.READ) { String row; SerializedCellsReader reader = new SerializedCellsReader(null); if (mSetup.parallelism != 0) { System.out.println("Parallel reads not implemented"); System.exit(1); } try { for (long i=task.start; i