Home Forums HBase incomplete insertion of rows to the Hbase table

This topic contains 7 replies, has 2 voices, and was last updated by  Divya Garg 8 months, 1 week ago.

  • Creator
    Topic
  • #47286

    Divya Garg
    Participant

    i have written a java program to insert rows in an existing “table” to test the performance of Hbase in a multi-threaded environment.
    Problem is : if I am pushing 4000 or more rows , approximately 3800 rows are inserted.
    i am not even getting any error. I am unable to figure out the reason for such a limited no of insertions.
    Also if i decrease the qualifiers in each row, then i am able to insert more rows, lets say till 4300 but not more.
    This keeps happening , on decreasing the qualifiers insertions increase.
    Hence what i could figure out was that It is related to some byte size/ memory.
    But really unable to solve the problem. please help.

Viewing 7 replies - 1 through 7 (of 7 total)

You must be logged in to reply to this topic.

  • Author
    Replies
  • #47432

    Divya Garg
    Participant

    yes,, You are correct , all the records are getting inserted but the code does not increase the writes , Irrespective of whether i flush the record or not [results are same]. I dont understand, if everything is right and in place , why the writes not getting incremented by the exact correct value. Here’s my code:
    private static final String Table_Name = “channelJabong”;
    int min;
    int max;
    private Configuration conf;

    public MultiThreadedHbase(int Min, int Max, Configuration conf) {
    min = Min;
    max = Max;
    this.conf = conf;
    }
    public static ArrayList<Integer> rows = new ArrayList<Integer>();
    private static int writes = 0;

    public void addRecord(String tableName, int rowKey, String family,
    String qualifier, String value, HTable table ) throws Exception {
    try {
    Put put = new Put(Bytes.toBytes(rowKey));
    put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier),
    Bytes.toBytes(value));
    table.put(put);
    //table.flushCommits();
    } catch (IOException e) {
    e.printStackTrace();
    }
    }

    @Override
    public void run() {
    // TODO Auto-generated method stub
    String tablename = Table_Name;
    try {
    System.out.println(“Min: ” + min + ” Max: ” + max);
    HTable table=new HTable(conf, tablename);

    for (int i = min; i <= max; i++) {

    Random ran = new Random();
    int randomNum = ran.nextInt((max – min) + 1) + min;

    int row = i;
    System.out.println(“row=” + row);
    rows.add(row);
    try {
    System.out.println(“writing ………..”);
    addRecord(tablename, row, “channelFamily”,
    “BrowserID”,
    “bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb”,table);

    addRecord(tablename, row, “channelFamily”, “Channel”,
    “CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCfacebook”,table);

    addRecord(tablename, row, “channelFamily”,
    “Category”, “CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCMen”, table);
    addRecord(tablename, row, “channelFamily”,
    “URL”,
    “http://stackoverflow.com/questions/12396170/hello”, table);
    addRecord(tablename, row, “channelFamily”,
    “PageType”, “CCCCCCCCCCCCCCCCCCCProduct”, table);
    addRecord(tablename, row, “channelFamily”,
    “Timestamp”, ” 2010-09-27 09:04:44.000000000 -0400″, table);
    addRecord(tablename, row, “channelFamily”, “email”,
    “bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb@gmail.com”, table);

    System.out.println(“in progress …………………” + writes+ “will append”+ “row : “+ row);

    addRecord(tablename,
    row,
    “channelFamily”,
    “impressions”,
    “or more contributor license agreements. See the NOTICE file distributed with this” , table);
    writes++;

    System.out.println(writes);
    } catch (Exception e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }

    }
    // table.close();
    } catch (IOException e1) {
    // TODO Auto-generated catch block
    e1.printStackTrace();
    }
    }

    public static void main(String[] args) throws Exception {
    Configuration conf = null;

    Collapse
    #47356

    Enis Soztutar
    Participant

    Did you check the number of rows from HBase, or printing out that info from the client side? You can paste the code if you want.

    Collapse
    #47354

    Divya Garg
    Participant

    Thanks. Enis, I figured out that I was creating one HTable instance every time i was adding one qualifier to each row, and that was the reason why i was only able to make a limited inserts of upto 3800 approx., irrespective of the no of insertions( 4000 or 20000). Though i am not aware of the reason why it happened.

    After correcting this, and making one HTable instance for each row ,such that each qualifier in the row has the same instance of HTable. This problem was solved and insertions went up.(Also i made one HTable instance for each thread and the same results). So , thats how this limitation of insertion problem was solved.

    However, there is some instability in the environment. In-spite of adding flushes after adding each record , sometimes there are a few less rows inserted in to the table, and sometimes the insertions are complete.
    I am showing below the results of one run, which shows that 6000 rows should have been added, but only 5994 were written.
    Threads : 300
    no of writes per thread 20
    total time : 8.0
    total writes : 5994
    writes/sec : 749
    6000

    Collapse
    #47347

    Enis Soztutar
    Participant

    The problem is that you are not flush()’ing the table, or closing the table properly. HTable buffers the writes on the client side to send them in mini-batches. Unless the buffer is full, or the user calls HTable.flush() or HTable.close(), the write’s won’t be sent to the server. You may want to call flush() everytime you add a record, or do your own batching. Also an HTable instantiation is a costly operation, you want to keep an HTable instance around, and not reconstruct one every time.

    Collapse
    #47327

    Divya Garg
    Participant

    Assuming the table already exists.. this is the code :
    package Hbase;

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;

    import javax.sql.RowSet;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.avro.generated.HBase;
    import org.apache.hadoop.hbase.client.HConnection;
    import org.apache.hadoop.hbase.client.HConnectionManager;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.HTableInterface;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.util.Bytes;

    public class MultiThreadedHbase implements Runnable {
    private static final String Table_Name = “channel1″;
    int min;
    int max;
    private Configuration conf;

    public MultiThreadedHbase(int Min, int Max, Configuration conf) {
    min = Min;
    max = Max;
    this.conf = conf;
    }
    public static ArrayList<Integer> rows = new ArrayList<Integer>();
    private static int writes = 0;

    public void addRecord(String tableName, int rowKey, String family,
    String qualifier, String value) throws Exception {
    try {
    HTable table = new HTable(conf, tableName);
    // HTable table = new HTable(conf, tableName);
    Put put = new Put(Bytes.toBytes(rowKey));
    put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier),
    Bytes.toBytes(value));
    table.put(put);
    // System.out.println(“insert recored ” + rowKey + ” to table ” +
    // tableName + ” ok.”);
    } catch (IOException e) {
    e.printStackTrace();
    }
    }

    @Override
    public void run() {
    // TODO Auto-generated method stub
    System.out.println(“Min: ” + min + ” Max: ” + max);
    for (int i = min; i <= max; i++) {

    String tablename = TableName;
    Random ran = new Random();
    int randomNum = ran.nextInt((max – min) + 1) + min;

    int row = i;
    System.out.println(“row=” + row);
    rows.add(row);
    try {
    System.out.println(“writing ………..”);
    addRecord(tablename, row, “channelFamily”,
    “BrowserID”,
    “bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb”);

    addRecord(tablename, row, “channelFamily”, “Channel”,
    “CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCfacebook”);

    addRecord(tablename, row, “channelFamily”,
    “Category”, “CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCMen”);
    addRecord(tablename, row, “channelFamily”,
    “URL”,
    “http://stackoverflow.com/questions/12396170/hello”);
    addRecord(tablename, row, “channelFamily”,
    “PageType”, “CCCCCCCCCCCCCCCCCCCProduct”);
    addRecord(tablename, row, “channelFamily”,
    “Timestamp”, ” 2010-09-27 09:04:44.000000000 -0400″);
    // addRecord(tablename, row, “channelFamily”, “email”,
    // “bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb@gmail.com”);

    System.out.println(“in progress …………………” + writes+ “will append”+ “row : “+ row);

    Collapse
    #47316

    Divya Garg
    Participant

    Hi Enis,
    Thanks for the reply..

    Im already creating the new instance of HTable every time a new row is to be inserted in to the table. So i do not think that, that’s the problem.
    Also, If im inserting less rows, then all the rows are inserted in to the table. On dropping any of the qualifiers , the limit to which rows can be inserted increases.
    It seems a different problem, something related to the byes that can be written to the table.. (Also, i am working on a pseudo distributed environment, thats my laptop)

    Collapse
    #47308

    Enis Soztutar
    Participant

    You may be having concurrency problems due to the usage of the HTable API. HTable is not thread safe, so all threads should have their own instances.

    Collapse
Viewing 7 replies - 1 through 7 (of 7 total)