Table of Contents
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
If you are a developer or an application programmer who intends to modify Sqoop or build an extension using one of Sqoop’s internal APIs, you should read this document. The following sections describe the purpose of each API, where internal APIs are used, and which APIs are necessary for implementing support for additional databases.
Apache Sqoop is an open source software product of The Apache Software Foundation. Development for Sqoop occurs at http://sqoop.apache.org. At that site, you can obtain:
The following prerequisite knowledge is required for Sqoop:
Software development in Java
This document assumes you are using a Linux or Linux-like environment. If you are using Windows, you may be able to use cygwin to accomplish most of the following tasks. If you are using Mac OS X, you should see few (if any) compatibility errors. Sqoop is predominantly operated and tested on Linux.
You can obtain the source code for Sqoop using following command: git clone https://git-wip-us.apache.org/repos/asf/sqoop.git
Sqoop source code is held in a git
repository. Instructions for
retrieving source from the repository are provided at:
TODO provide a page in the web site.
Compilation instructions are provided in the COMPILING.txt
file in
the root of the source repository.
This section specifies the APIs available to application writers who want to integrate with Sqoop, and those who want to modify Sqoop.
The next three subsections are written for the following use cases:
Each section describes the system in successively greater depth.
Sqoop automatically generates classes that represent the tables imported into the Hadoop Distributed File System (HDFS). The class contains member fields for each column of the imported table; an instance of the class holds one row of the table. The generated classes implement the serialization APIs used in Hadoop, namely the Writable and DBWritable interfaces. They also contain these other convenience methods:
The full set of methods guaranteed to exist in an auto-generated class
is specified in the abstract class
com.cloudera.sqoop.lib.SqoopRecord
.
Instances of SqoopRecord
may depend on Sqoop’s public API. This is all classes
in the com.cloudera.sqoop.lib
package. These are briefly described below.
Clients of Sqoop should not need to directly interact with any of these classes,
although classes generated by Sqoop will depend on them. Therefore, these APIs
are considered public and care will be taken when forward-evolving them.
RecordParser
class will parse a line of text into a list of fields,
using controllable delimiters and quote characters.
FieldFormatter
class provides a method which handles quoting and
escaping of characters in a field which will be used in
SqoopRecord.toString()
implementations.
JdbcWritableBridge
.
BigDecimalSerializer
contains a pair of methods that facilitate
serialization of BigDecimal
objects over the Writable interface.
The full specification of the public API is available on the Sqoop Development Wiki as SIP-4.
This section covers the API and primary classes used by extensions for Sqoop which allow Sqoop to interface with more database vendors.
While Sqoop uses JDBC and DataDrivenDBInputFormat
to
read from databases, differences in the SQL supported by different vendors as
well as JDBC metadata necessitates vendor-specific codepaths for most databases.
Sqoop’s solution to this problem is by introducing the ConnManager
API
(com.cloudera.sqoop.manager.ConnMananger
).
ConnManager
is an abstract class defining all methods that interact with the
database itself. Most implementations of ConnManager
will extend the
com.cloudera.sqoop.manager.SqlManager
abstract class, which uses standard
SQL to perform most actions. Subclasses are required to implement the
getConnection()
method which returns the actual JDBC connection to the
database. Subclasses are free to override all other methods as well. The
SqlManager
class itself exposes a protected API that allows developers to
selectively override behavior. For example, the getColNamesQuery()
method
allows the SQL query used by getColNames()
to be modified without needing to
rewrite the majority of getColNames()
.
ConnManager
implementations receive a lot of their configuration
data from a Sqoop-specific class, SqoopOptions
. SqoopOptions
are
mutable. SqoopOptions
does not directly store specific per-manager
options. Instead, it contains a reference to the Configuration
returned by Tool.getConf()
after parsing command-line arguments with
the GenericOptionsParser
. This allows extension arguments via "-D
any.specific.param=any.value
" without requiring any layering of
options parsing or modification of SqoopOptions
. This
Configuration
forms the basis of the Configuration
passed to any
MapReduce Job
invoked in the workflow, so that users can set on the
command-line any necessary custom Hadoop state.
All existing ConnManager
implementations are stateless. Thus, the
system which instantiates ConnManagers
may implement multiple
instances of the same ConnMananger
class over Sqoop’s lifetime. It
is currently assumed that instantiating a ConnManager
is a
lightweight operation, and is done reasonably infrequently. Therefore,
ConnManagers
are not cached between operations, etc.
ConnManagers
are currently created by instances of the abstract
class ManagerFactory
(See
http://issues.apache.org/jira/browse/MAPREDUCE-750). One
ManagerFactory
implementation currently serves all of Sqoop:
com.cloudera.sqoop.manager.DefaultManagerFactory
. Extensions
should not modify DefaultManagerFactory
. Instead, an
extension-specific ManagerFactory
implementation should be provided
with the new ConnManager
. ManagerFactory
has a single method of
note, named accept()
. This method will determine whether it can
instantiate a ConnManager
for the user’s SqoopOptions
. If so, it
returns the ConnManager
instance. Otherwise, it returns null
.
The ManagerFactory
implementations used are governed by the
sqoop.connection.factories
setting in sqoop-site.xml
. Users of extension
libraries can install the 3rd-party library containing a new ManagerFactory
and ConnManager
(s), and configure sqoop-site.xml
to use the new
ManagerFactory
. The DefaultManagerFactory
principly discriminates between
databases by parsing the connect string stored in SqoopOptions
.
Extension authors may make use of classes in the com.cloudera.sqoop.io
,
mapreduce
, and util
packages to facilitate their implementations.
These packages and classes are described in more detail in the following
section.
Sqoop supports imports from databases to HBase. When copying data into HBase, it must be transformed into a format HBase can accept. Specifically:
All of this is done via Put
statements in the HBase client API.
Sqoop’s interaction with HBase is performed in the com.cloudera.sqoop.hbase
package. Records are deserialzed from the database and emitted from the mapper.
The OutputFormat is responsible for inserting the results into HBase. This is
done through an interface called PutTransformer
. The PutTransformer
has a method called getPutCommand()
that
takes as input a Map<String, Object>
representing the fields of the dataset.
It returns a List<Put>
describing how to insert the cells into HBase.
The default PutTransformer
implementation is the ToStringPutTransformer
that uses the string-based representation of each field to serialize the
fields to HBase.
You can override this implementation by implementing your own PutTransformer
and adding it to the classpath for the map tasks (e.g., with the -libjars
option). To tell Sqoop to use your implementation, set the
sqoop.hbase.insert.put.transformer.class
property to identify your class
with -D
.
Within your PutTransformer implementation, the specified row key
column and column family are
available via the getRowKeyColumn()
and getColumnFamily()
methods.
You are free to make additional Put operations outside these constraints;
for example, to inject additional rows representing a secondary index.
However, Sqoop will execute all Put
operations against the table
specified with --hbase-table
.
This section describes the internal architecture of Sqoop.
The Sqoop program is driven by the com.cloudera.sqoop.Sqoop
main class.
A limited number of additional classes are in the same package; SqoopOptions
(described earlier) and ConnFactory
(which manipulates ManagerFactory
instances).
The general program flow is as follows:
com.cloudera.sqoop.Sqoop
is the main class and implements Tool. A new
instance is launched with ToolRunner
. The first argument to Sqoop is
a string identifying the name of a SqoopTool
to run. The SqoopTool
itself drives the execution of the user’s requested operation (e.g.,
import, export, codegen, etc).
The SqoopTool
API is specified fully in
SIP-1.
The chosen SqoopTool
will parse the remainder of the arguments,
setting the appropriate fields in the SqoopOptions
class. It will
then run its body.
Then in the SqoopTool’s run()
method, the import or export or other
action proper is executed. Typically, a ConnManager
is then
instantiated based on the data in the SqoopOptions
. The
ConnFactory
is used to get a ConnManager
from a ManagerFactory
;
the mechanics of this were described in an earlier section. Imports
and exports and other large data motion tasks typically run a
MapReduce job to operate on a table in a parallel, reliable fashion.
An import does not specifically need to be run via a MapReduce job;
the ConnManager.importTable()
method is left to determine how best
to run the import. Each main action is actually controlled by the
ConnMananger
, except for the generating of code, which is done by
the CompilationManager
and ClassWriter
. (Both in the
com.cloudera.sqoop.orm
package.) Importing into Hive is also
taken care of via the com.cloudera.sqoop.hive.HiveImport
class
after the importTable()
has completed. This is done without concern
for the ConnManager
implementation used.
A ConnManager’s importTable()
method receives a single argument of
type ImportJobContext
which contains parameters to the method. This
class may be extended with additional parameters in the future, which
optionally further direct the import operation. Similarly, the
exportTable()
method receives an argument of type
ExportJobContext
. These classes contain the name of the table to
import/export, a reference to the SqoopOptions
object, and other
related data.
The following subpackages under com.cloudera.sqoop
exist:
hive
- Facilitates importing data to Hive.
io
- Implementations of java.io.*
interfaces (namely, OutputStream and
Writer).
lib
- The external public API (described earlier).
manager
- The ConnManager
and ManagerFactory
interface and their
implementations.
mapreduce
- Classes interfacing with the new (0.20+) MapReduce API.
orm
- Code auto-generation.
tool
- Implementations of SqoopTool
.
util
- Miscellaneous utility classes.
The io
package contains OutputStream and BufferedWriter implementations
used by direct writers to HDFS. The SplittableBufferedWriter
allows a single
BufferedWriter to be opened to a client which will, under the hood, write to
multiple files in series as they reach a target threshold size. This allows
unsplittable compression libraries (e.g., gzip) to be used in conjunction with
Sqoop import while still allowing subsequent MapReduce jobs to use multiple
input splits per dataset. The large object file storage (see
SIP-3) system’s code
lies in the io
package as well.
The mapreduce
package contains code that interfaces directly with
Hadoop MapReduce. This package’s contents are described in more detail
in the next section.
The orm
package contains code used for class generation. It depends on the
JDK’s tools.jar which provides the com.sun.tools.javac package.
The util
package contains various utilities used throughout Sqoop:
ClassLoaderStack
manages a stack of ClassLoader
instances used by the
current thread. This is principly used to load auto-generated code into the
current thread when running MapReduce in local (standalone) mode.
DirectImportUtils
contains convenience methods used by direct HDFS
importers.
Executor
launches external processes and connects these to stream handlers
generated by an AsyncSink (see more detail below).
ExportException
is thrown by ConnManagers
when exports fail.
ImportException
is thrown by ConnManagers
when imports fail.
JdbcUrl
handles parsing of connect strings, which are URL-like but not
specification-conforming. (In particular, JDBC connect strings may have
multi:part:scheme://
components.)
PerfCounters
are used to estimate transfer rates for display to the user.
ResultSetPrinter
will pretty-print a ResultSet.
In several places, Sqoop reads the stdout from external processes. The most
straightforward cases are direct-mode imports as performed by the
LocalMySQLManager
and DirectPostgresqlManager
. After a process is spawned by
Runtime.exec()
, its stdout (Process.getInputStream()
) and potentially stderr
(Process.getErrorStream()
) must be handled. Failure to read enough data from
both of these streams will cause the external process to block before writing
more. Consequently, these must both be handled, and preferably asynchronously.
In Sqoop parlance, an "async sink" is a thread that takes an InputStream
and
reads it to completion. These are realized by AsyncSink
implementations. The
com.cloudera.sqoop.util.AsyncSink
abstract class defines the operations
this factory must perform. processStream()
will spawn another thread to
immediately begin handling the data read from the InputStream
argument; it
must read this stream to completion. The join()
method allows external threads
to wait until this processing is complete.
Some "stock" AsyncSink
implementations are provided: the LoggingAsyncSink
will
repeat everything on the InputStream
as log4j INFO statements. The
NullAsyncSink
consumes all its input and does nothing.
The various ConnManagers
that make use of external processes have their own
AsyncSink
implementations as inner classes, which read from the database tools
and forward the data along to HDFS, possibly performing formatting conversions
in the meantime.
Sqoop schedules MapReduce jobs to effect imports and exports.
Configuration and execution of MapReduce jobs follows a few common
steps (configuring the InputFormat
; configuring the OutputFormat
;
setting the Mapper
implementation; etc…). These steps are
formalized in the com.cloudera.sqoop.mapreduce.JobBase
class.
The JobBase
allows a user to specify the InputFormat
,
OutputFormat
, and Mapper
to use.
JobBase
itself is subclassed by ImportJobBase
and ExportJobBase
which offer better support for the particular configuration steps
common to import or export-related jobs, respectively.
ImportJobBase.runImport()
will call the configuration steps and run
a job to import a table to HDFS.
Subclasses of these base classes exist as well. For example,
DataDrivenImportJob
uses the DataDrivenDBInputFormat
to run an
import. This is the most common type of import used by the various
ConnManager
implementations available. MySQL uses a different class
(MySQLDumpImportJob
) to run a direct-mode import. Its custom
Mapper
and InputFormat
implementations reside in this package as
well.
Sqoop allows users to develop their own plugins. Users can develop their
plugins as separate jars, deploy them in $SQOOP_LIB and register with
sqoop. Infact, Sqoop architecture is a plugin based architecture and all
the internal tools like import, export, merge etc are also supported as
tool plugins. Users can also develop their own custom tool plugins. Once
deployed and registered with sqoop, these plugins will work like any
other internal tool. They will also get listed in the tools when you run
sqoop help
command.
BaseSqoopTool is the base class for all Sqoop Tools. If you want to develop a cusom tool, you need to inherit your tool from BaseSqoopTool and override the following methods:
public int run(SqoopOptions options)
: This is the main method for the
tool and acts as entry point for execution for your custom tool.
public void configureOptions(ToolOptions toolOptions)
: Configures the
command-line arguments we expect to receive. You can also specify the
description of all the command line arguments. When a user executes
sqoop help <your tool>
, the information which is provided in this
method will be output to the user.
public void applyOptions(CommandLine in, SqoopOptions out)
: parses all
options and populates SqoopOptions which acts as a data transfer object
during the complete execution.
public void validateOptions(SqoopOptions options)
: provide any
validations required for your options.
Sqoop parses the arguments which are passed by users and are stored in
SqoopOptions object. This object then acts as data transfer object. This
object is passed to various phases of processing like preprocessing before
running the actual MapReduce, MapReduce phase and even postprocessing phase.
This class has a lot of members. The options are parsed and populated in the
respective member. Now lets say that a user creates a new user defined tool
and this tool has some new options which don’t map to any of the existing
members of the SqoopOptions class. Either user can add a new member to
SqoopOption class which means users will have to make changes in sqoop and
compile it, which mght not be possible always for all users. Other option
is to use extraArgs
member. This is a string array which contains the
options for thirdparty tools which could be passed directly to the third
party tool like mysqldump etc. This array string needs parsing every time
to understand the parameters.
The most elegant way of supporting custom options for user defined tool is
customToolOptions
map. This is a map member of SqoopOption class.
Developer can parse the user defined parameters and populate this map with
appropriate key/value pairs. When SqoopOption object is passed to various
phases of processing these values will be readily available and parsing is
not required for every access.
Lets take an example to understand the usage better. Lets say you want to
develop a custom tool to merge two hive tables and it will take the following
parameters :
--hive-updates-database
--hive-updates-table
--merge-keys
--retain-updates-tbl
None of these options are available in SqoopOption object. Tool Developer
can override the applyOptions
method and in this method the user options
can be parsed and populated in the customToolOptions map. Once that is done,
SqoopOption object can be passed throughout program and these values will
be available for users.
These option names will be stored as keys and the values passed by users will be stored as values. Lets define these options as static finals :
public static final String MERGE_KEYS = "merge-keys"; public static final String HIVE_UPDATES_TABLE = "hive-updates-table"; public static final String HIVE_UPDATES_TABLE_DB = "hive-updates-database"; public static final String RETAIN_UPDATES_TBL = "retain-updates-tbl";
A sample applyOptions example which parses the above said options and populates the customToolOptions map is below :
public void applyOptions(CommandLine in, SqoopOptions out) throws InvalidOptionsException { if (in.hasOption(VERBOSE_ARG)) { LoggingUtils.setDebugLevel(); log.debug("Enabled debug logging."); } if (in.hasOption(HELP_ARG)) { ToolOptions toolOpts = new ToolOptions(); configureOptions(toolOpts); printHelp(toolOpts); throw new InvalidOptionsException(""); } Map<String, String> mergeOptionsMap = new HashMap<String, String>(); if (in.hasOption(MERGE_KEYS)) { mergeOptionsMap.put(MERGE_KEYS, in.getOptionValue(MERGE_KEYS)); } if (in.hasOption(HIVE_UPDATES_TABLE)) { mergeOptionsMap.put(HIVE_UPDATES_TABLE, in.getOptionValue(HIVE_UPDATES_TABLE)); } if (in.hasOption(HIVE_UPDATES_TABLE_DB)) { mergeOptionsMap.put(HIVE_UPDATES_TABLE_DB, in.getOptionValue(HIVE_UPDATES_TABLE_DB)); } if (in.hasOption(RETAIN_UPDATES_TBL)) { mergeOptionsMap.put(RETAIN_UPDATES_TBL, ""); } if (in.hasOption(HIVE_TABLE_ARG)) { out.setHiveTableName(in.getOptionValue(HIVE_TABLE_ARG)); } if (in.hasOption(HIVE_DATABASE_ARG)) { out.setHiveDatabaseName(in.getOptionValue(HIVE_DATABASE_ARG)); } if (out.getCustomToolOptions() == null) { out.setCustomToolOptions(mergeOptionsMap); } }
Once the tool is developed, you need to wrap it with a plugin class and
register that plugin class with Sqoop. Your plugin class should extend from
org.apache.sqoop.tool.ToolPlugin
and override getTools()
method.
Example: Lets say that you have developed a tool called hive-merge which
merges 2 hive tables and your Tool class is HiveMergeTool, the plugin
implementation will look like
public class HiveMergePlugin extends ToolPlugin { @Override public List<ToolDesc> getTools() { return Collections .singletonList(new ToolDesc( "hive-merge", HiveMergeTool.class, "This tool is used to perform the merge data from a tmp hive table into a destination hive table.")); } }
Finally you need to copy your plugin jar to $SQOOP_LIB directory and register the plugin class with sqoop in sqoop-site.xml :
<property> <name>sqoop.tool.plugins</name> <value>com.expedia.sqoop.tool.HiveMergePlugin</value> <description>A comma-delimited list of ToolPlugin implementations which are consulted, in order, to register SqoopTool instances which allow third-party tools to be used. </description> </property>