Connecting Hive and R via MySQL using Sqoop

Rrrrrr – because pirates > ninjas

This is all kind of backwards, why would I want to take data out of cascalog/hdfs to MySQL and then back into hdfs/hive? Anyway, the general aim here it to get some data, from Hive, into R and do something with it.

Step 1: Get Data
In the first step we will grab some data from an existing database, created when doing a demo of cascalog, and load it into Hive using a tool called sqoop. Sqoop is a tool designed to move large quantities of data between hdfs and structured datastores.

Download sqoop and cd into bin/

./sqoop import --connect jdbc:mysql://localhost/cascalog  --username dan  --table unique_user_counts -m 1 --hive-import

This will import data from the cascalog MySQL database, specifically the unique_user_counts database. The –hive-import switch will place data into Hive (I installed Hive using Homebrew). The -m toggle defines the number of mappers the job will use, in this instance just 1.

You can take a peek at the data using the Hive repl.

λ bin  hive
hive> show databases;
Time taken: 2.48 seconds
hive> use default;
Time taken: 0.014 seconds
hive> show tables;
Time taken: 0.286 seconds
hive> select * from unique_user_counts;

This dumps out the data. If you want to know more about your table then you can use the describe ‘table_name’ command.

To get data into R Hive must be started in server mode, this can be achieved by

hive --service hiveserver

The final step is R …


hive_jars <-list.files("/usr/local/Cellar/hadoop/1.1.1/libexec", pattern="jar$", full.names=T)
hadoop_jars <- list.files("/usr/local/Cellar/hive/0.9.0/libexec/lib", pattern="jar$", full.names=T)

hive_driver <- JDBC("org.apache.hadoop.hive.jdbc.HiveDriver", c(hive_jars, hadoop_jars))

hive_conn <- dbConnect(hive_driver, "jdbc:hive://localhost:10000/default")

rs <- dbGetQuery(hive_conn,"select client, count from unique_user_counts")

x <- 1:length(rs$count)
plot(x, rs$count)

This gives me a very boring plot – it’s really of no interest at all, it’s marginally more interesting than doing a select count(*) from …

The *interesting* part here is the definition of the jars required to connect to Hive, this is largely identical to a previous post.

cast data in mysql

Want to convert some data from one thing into something else within MySQL? Try cast!

select id, cast(sequence as CHAR(1000) character set utf8) from table;

The cast, in this case, is required because the data is being stored in BLOB format rather than as a textfield or a simple varchar – either would do the job. I am not responsible for this.

Use R to access a mysql db

It’s fairly simple to connect to a MySQL database using R.

You may need to install the RMySQL library. Use the Package Manager to do so.

# require the mysql interface
m <- dbDriver("MySQL")

#what database do you want to connect to
#this is just like Python and Perl connections
con <- dbConnect(m, password="your-passwd", db="your-db")

# prepare and send a query
rs <- dbSendQuery( con, statement = "select x from y where x > 10")

# get all the data. n = -1 means return all data
data <- fetch( rs, n = -1)

# now do something with the data ...

It’s that simple. However, like many things, the more you look at it the more you realise you don’t know. To explore further open R and type ??rmysql or ??mysql.

Update MySQL field required status.

It’s been a while since I did MySQL stuff on the command line but working on my current goofy project, it’s getting less infrequent. To update the required status of a field in a table use the following:

alter table <table_name> modify <column_name> <column_type> <status>;

where column_type is one of the MySQL types e.g. varchar(700) and status is required (not null) or not (null):

alter table my_special_friends modify twitter_id varchar(145) null;

SBT – scalatest + jdbc

In the sbt project definition add the following to get scalatest and mysql drivers
(project file: project/build/ProjectName.scala)

import sbt._ 
class ProjectName(info: ProjectInfo) extends DefaultProject(info) { 
  val scalaToolsSnapshots = ScalaToolsSnapshots
  val scalatest = "org.scalatest" % "scalatest" % "1.0.1-for-scala-2.8.0.RC1-SNAPSHOT"
  val mysql = "mysql" % "mysql-connector-java" % "5.1.12" % "compile"

Then type sbt update

This should get scalatest & mysql up and running with SBT.

MySQL, scala and BLOB

First download the JDBC driver.

To work in the scala REPL type:

-=[biomunky@playtime svn]=-  scala -cp path/to/mysql-connector-java-5.1.13/mysql-connector-java-5.1.13-bin.jar

import the required methods from java.sql:

import java.sql.{Connection, DriverManager, SQLException, ResultSet}

the connection string to the database should look something like this:

val conString = "jdbc:mysql://localhost:3306/DBNAME?user=UID&password=PASSWD"

Then, load the driver and start the connection:

val con = DriverManager.getConnection(conString)

Then, if working in a script/class/whatever it would be worth starting a try/catch/finally block

try {
    // do some stuff
catch {
    case _ => println("A problem!")
finally {
    println("Closing the con")

This isn’t really required in the REPL, not for the stuff I am doing. The data I am after is in BLOB format and all I wanted to do was dump it to a file (not what the final code will do).

val handle = con.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
val rs 	   = handle.executeQuery("select my_blob from BLOBS limit 1")

The first line creates a handle to the database that has read only access, the second pulls a single blob entry from the dataset into an sql.ResultSet. Not being familiar with the syntax/classes available for processing the data the following seemed suitably functional:

while ( {
    val theBlob = rs.getBlob("my_blob") // for more options see *

* JavaDoc

To write this to a file do the following inside the while loop (below theBlob).

val outputFile = new File( "/tmp/" + rs.getString("/tmp/filename") )
val outhandle = new BufferedOutputStream( new FileOutputStream(outputFile))
outhandle.write( myBlob.getBytes(1 , myBlob.length.toInt), 0, myBlob.length.toInt )

As alternative, coerce the BLOB into a list:

val byteStream =  new ByteArrayOutputStream()
byteStream.write(theBlobl.getBytes(1, theBlob.length.toInt), 0, theBlob.length.toInt).toString.split("\n")
(byteStream.toString.split("\n").toList).foreach { println(_)}

The previous two steps are made possible by doing the following import

import{BufferedOutputStream, FileOutputStream, File, ByteArrayOutputStream};

Then close the database connection and make a cup of tea.