Thursday 21 February 2013

Bulk Insert in Cassandra


Cassandra's bulk loading interfaces are most useful in two use cases: initial migration or restore from another datastore/cluster and regular ETL. Bulk loading assumes that it's important to you to avoid loading via the thrift interface, for example because you haven't integrated a client library yet or because throughput is critical.

In this blog we will see the two efficient ways of bulk loading in Cassandra.
1. By using Copy command to import CSV files into the database.
2. Copy command in Cassandra using java code.

Copy command for Cassandra bulk loading :


  •   Syntax for COPY FROM command

COPY <column family name> [ ( column [, ...] ) ] FROM ( '<filename>' | STDIN )
[ WITH <option>='value' [AND ...] ];


  •   Description of COPY FROM command 


Cassandra COPY FROM command option Using these options in a WITH clause, you can change the format of the CSV format. The following table describes these options:


COPY Option                        Default Value                                   Function 
DELIMITER                          Comma(,)                 Set the character that separates fields in the file. QUOTE                                 Quotation mark(“)       Set the character that encloses field values.
 ESCAPE                               Backslash(\)                Set the character that escapes literal uses of the QUOTE                                 False                           Set true to indicate that first row of the file is a header.


  1.  By default, when you use the COPY FROM command, Cassandra expects every row in the CSV input to contain the same number of columns. The number of columns in the CSV input is the same as the number of columns in the Cassandra table metadata. Cassandra assigns fields in the respective order. To apply your input data to a particular set of columns, specify the column names in parentheses after the table name.
  2.  COPY FROM command is only supported in Cassandra CQLSH environment, It will not work while using Cassandra CLI.

 IMPORTANT NOTE: COPY FROM is intended for importing small datasets (a few million rows or less) into Cassandra. For importing larger datasets, use Cassandra SSTableBulkLoader .


 Example :


Copy command in Cassandra using Java Code :


  •  Now we will see how we do the bulk load through java code.


  1.  Basically we will generate the copy command through java according to our need.
  2.  After generating the command we will write all those commands in a query file.
  3. As in Cassandra we can execute any query file use our command prompt or terminal for linux.

      Syntax : $CASSANDRA_HOME/bin/cqlsh -k <keyspace name > -f <filename>


   Now By using Java utility of running console commands we will execute the above command for
   doing the Copy Bulk insert in Cassandra.

   Java Code for performing bulk copy in Cassandra.


import java.io.BufferedReader;
import java.io.DataInputStream; 
import java.io.FileInputStream; 
import java.io.InputStreamReader;
import java.io.FileWriter; 
import java.io.BufferedWriter; 
public class Copycassandra { 
  public static void main(String[] args) { 
     try { 
            // directory for data CSV files 
File pathName = new File("/home/prashant/data/"); 
String[] fileNames = pathName.list();
FileInputStream fstream = new
FileInputStream("sampletables.txt"); 
// sampletables.txt file contains the table names in which we want 
// to do bulk copy
DataInputStream in = new DataInputStream(fstream); 
BufferedReader br = new BufferedReader(new InputStreamReader(in)); 
String strLine, sql; 
FileWriter foutstream = new FileWriter("/home/prashant/queries.txt"); 
BufferedWriter out = new BufferedWriter(foutstream); 
    while ((strLine = br.readLine()) != null) 
             { 
               for (int i = 0; i < fileNames.length; i++) 
                 { 
                    if (strLine.equals(fileNames[i])) 
                         { 
sql = "copy " + strLine + " from '/home/prashant/data/"+ fileNames[i] + " WITH delimiter='|';"; out.write(sql + "\n"); 
                           } 
                 } 
         } 
out.close(); 
String sysEnvStr = System.getenv("CASSANDRA_HOME"); 
Process p = Runtime.getRuntime().exec(sysEnvStr+ "/bin/cqlsh -k idbench -f /home/prashant/queries.txt"); 
BufferedReader stdInput = new BufferedReader(new InputStreamReader(p.getInputStream())); 
BufferedReader stdError = new BufferedReader(new InputStreamReader(p.getErrorStream())); 
// read the output from the command 
String s = ""; 
while ((s = stdInput.readLine()) != null) 
  { 
     System.out.println(s); 
  } 
while ((s = stdError.readLine()) != null) 
  { 
     System.out.println("Std ERROR : " + s); 
  } 


catch (Exception e) 
{ e.printStackTrace();
 } 

}

Bulk-Loading Data to Cassandra with SSTable.


  • The 'sstableloader' introduced from Apache Cassandra 0.8.1 onwards, provides a powerful way to load huge volumes of data into a Cassandra cluster. If you are moving from a cloud cluster to a dedicated cluster or vice-versa or from a different database to Cassandra you will be interested in this tool. As shown below in whatever case if you can generate the 'sstable' from the data to be loaded into Cassandra, you can load it in bulk to the cluster using 'sstableloader'.
       
  •  Once you Have SSTable files generated from CSV files then you can use sstableloader utility provided by Cassandra.


  • Inside bin directory of Cassandra you can find this tool sstableloader. You can run it through command line pointing to the above generated sstables. Good guidance on that can be found in Datastax .
  • Also you can directly use the class 'org.apache.cassandra.tools.Bulkloader' in java code to load the sstables to a Cassandra cluster.
  • If you are testing all this in localhost, we run sstableloader from command line as follows,

                        - ./sstableloader -d localhost <path to generated sstables>