公告版位

新版部落格:blog.changyy.org

+ =HadoopDB
圖片來源:http://hadoop.apache.org/http://wiki.postgresql.org/

雖然,上頭圖片的結合並不代表 HadoopDB 的原意,但 HadoopDB 開發上使用了 PostgreSQL 作為範例。那什麼是 HadoopDB 呢?那就是提出並實作在 DataNode 上架設 Database 的模式,以此整合起來的一個服務架構。傳統 Databases 技術上的鑽研已不是短短幾年可以道盡,然而,目前火紅的 Hadoop ,在資料儲存上預設是採用 HDFS 模式,就是簡單的檔案儲存模式。如果把資料的管理擺在資料庫,那能不能將過去在資料庫上的效能帶入 Hadoop 裡呢?這就是 HadoopDB 想嘗試的方向。HadoopDB:An architectural hybrid of MapReduce and DBMS technologies,也有人用這樣來描述 HadoopDB:An Open Source Parallel Database


圖片來源:http://hadoopdb.sourceforge.net/guide/

上圖是 HadoopDB 的架構圖,有一個 SMS Planner 將 SQL 語法轉成 MapReduce Jobs ,而底層又有一次轉換,將 MapReduce 存取轉成 SQL 語法,在此不多探討,有興趣的請直接看它的 Paper 囉。

由於它是在 Hadoop-0.19.x 開發的,因此我還是用 Hadoop-0.19.2 來架設,至於架設部分可以參考這篇:[Linux] 安裝 Hadoop 0.20.1 Multi-Node Cluster @ Ubuntu 9.10,其中 0.19.2 與 0.20.1 安裝上只有些微的差別,在上述文章中的 hadoop-0.20.1/conf/core-site.xml 與 hadoop-0.20.1/conf/mapred-site.xml  的內容,只需改寫在 hadoop-0.19.2/conf/hadoop-site.xml 即可。接著下面的介紹也將延續上則安裝教學,以 3-Node Cluster ,分別以 Cluster01、Cluster02 和 Cluster03 作為範例敘述,並且各台使用 hadoop 帳號來操作。

  1. 首先需建立 3-Node Cluster on Hadoop 0.19.x
  2. 以下若是用 hadoop@Cluster0X:~ 代表 Cluster01 ~ Cluster03 都要做的
  3. 對各台安裝設定 PostgreSQL
    • 安裝並為資料庫建立 hadoop 帳號,假定使用密碼為 1234
    • hadoop@Cluster0X:~$ sudo apt-get install postgresql
    • hadoop@Cluster0X:~$ sudo vim /etc/postgresql/8.4/main/pg_hba.conf
      • #local   all         all                               ident
        local   all         all                               trust
        # IPv4 local connections:
        #host    all         all         127.0.0.1/32          md5
        host    all         all         127.0.0.1/32          password
        host    all         all         192.168.0.1/16          password            # 加上Cluster 機器 IP 範圍
        # IPv6 local connections:
        #host    all         all         ::1/128               md5
        host    all         all         ::1/128               password
    • hadoop@Cluster0X:~$ sudo /etc/init.d/postgresql-8.4 restart
    • hadoop@Cluster0X:~$ sudo su - postgres
    • postgres@Cluster0X:~$ createuser hadoop
      • Shall the new role be a superuser? (y/n) y
        postgres@Cluster01:~$ psql
        psql (8.4.2)
        Type "help" for help.

        postgres=# alter user hadoop with password '1234';
        ALTER ROLE
        postgres=# \q
    • 測試其他機器可否連線
      • hadoop@Cluster01:~$ createdb testdb
      • hadoop@Cluster02:~$ psql -h Cluster01 testdb
        • 錯誤訊息
          • psql: FATAL:  no pg_hba.conf entry for host "192.168.56.168", user "hadoop", database "testdb", SSL on
            FATAL:  no pg_hba.conf entry for host "192.168.56.168", user "hadoop", database "testdb", SSL off
        • 正確訊息
          • Password:
            psql (8.4.2)
            SSL connection (cipher: DHE-RSA-AES256-SHA, bits: 256)
            Type "help" for help.

            testdb=#
  4. 設定 HadoopDB
    • hadoop@Cluster0X:~$ cp hadoopdb.jar HADOOP_HOME/lib/
    • hadoop@Cluster0X:~$ cp postgresql-8.4-701.jdbc4.jar HADOOP_HOME/lib/
    • hadoop@Cluster0X:~$ vim HADOOP_HOME/conf/hadoop-site.xml

      • <property>
        <name>hadoopdb.config.file</name>
        <value>HadoopDB.xml</value>
        <description>The name of the HadoopDB cluster configuration file</description>
        </property>



        <property>
        <name>hadoopdb.fetch.size</name>
        <value>1000</value>
        <description>The number of records fetched from JDBC ResultSet at once</description>
        </property>


        <property>
        <name>hadoopdb.config.replication</name>
        <value>false</value>
        <description>Tells HadoopDB Catalog whether replication is enabled.
        Replica locations need to be specified in the catalog.
        False causes replica information to be ignored.</description>
        </property>
    • hadoop@Cluster01:~$ vim nodes.txt
      • 192.168.56.168
        192.168.56.169
        192.168.56.170
    • hadoop@Cluster01:~$ vim Catalog.properties
      • #Properties for Catalog Generation
        ##################################
        nodes_file=nodes.txt
        # Relations Name and Table Name are the same
        relations_unchunked=raw
        relations_chunked=poi
        catalog_file=HadoopDB.xml
        ##
        #DB Connection Parameters
        ##
        port=5432
        username=hadoop
        password=1234
        driver=org.postgresql.Driver
        url_prefix=jdbc\:postgresql\://
        ##
        #Chunking properties
        ##
        # the number of databases on a node
        chunks_per_node=3
        # for udb0 ,udb1, udb2 ( 3 nodes = 0 ~ 2 )
        unchunked_db_prefix=udb
        # for cdb0 ,cdb1, ... , cdb8 ( 3 nodes x 3 chunks = 0~8 )
        chunked_db_prefix=cdb
        ##
        #Replication Properties
        ##
        dump_script_prefix=/root/dump_
        replication_script_prefix=/root/load_replica_
        dump_file_u_prefix=/mnt/dump_udb
        dump_file_c_prefix=/mnt/dump_cdb
        ##
        #Cluster Connection
        ##
        ssh_key=id_rsa-gsg-keypair
    • hadoop@Cluster01:~$ java -cp lib/hadoopdb.jar edu.yale.cs.hadoopdb.catalog.SimpleCatalogGenerator Catalog.properties
      • 產生的 HadoopDB.xml 類似下面:
        <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
        <DBClusterConfiguration xmlns="http://edu.yale.cs.db.hadoop/DBConfigurationSchema">
            <Nodes Password="1234" Username="hadoop" Driver="org.postgresql.Driver" Location="192.168.56.168">
                <Relations id="raw">
                    <Partitions url="jdbc:postgresql://192.168.56.168:5432/udb0" id="0"/>
                </Relations>
                <Relations id="poi">
                    <Partitions url="jdbc:postgresql://192.168.56.168:5432/cdb0" id="0"/>
                    <Partitions url="jdbc:postgresql://192.168.56.168:5432/cdb1" id="1"/>
                    <Partitions url="jdbc:postgresql://192.168.56.168:5432/cdb2" id="2"/>
                </Relations>
            </Nodes>
            <Nodes Password="1234" Username="hadoop" Driver="org.postgresql.Driver" Location="192.168.56.169">
                <Relations id="raw">
                    <Partitions url="jdbc:postgresql://192.168.56.169:5432/udb1" id="1"/>
                </Relations>
                <Relations id="poi">
                    <Partitions url="jdbc:postgresql://192.168.56.169:5432/cdb3" id="3"/>
                    <Partitions url="jdbc:postgresql://192.168.56.169:5432/cdb4" id="4"/>
                    <Partitions url="jdbc:postgresql://192.168.56.169:5432/cdb5" id="5"/>
                </Relations>
            </Nodes>
            <Nodes Password="1234" Username="hadoop" Driver="org.postgresql.Driver" Location="192.168.56.170">
                <Relations id="raw">
                    <Partitions url="jdbc:postgresql://192.168.56.170:5432/udb2" id="2"/>
                </Relations>
                <Relations id="poi">
                    <Partitions url="jdbc:postgresql://192.168.56.170:5432/cdb6" id="6"/>
                    <Partitions url="jdbc:postgresql://192.168.56.170:5432/cdb7" id="7"/>
                    <Partitions url="jdbc:postgresql://192.168.56.170:5432/cdb8" id="8"/>
                </Relations>
            </Nodes>
        </DBClusterConfiguration>
    • hadoop@Cluster01:~$ hadoop dfs -put HadoopDB.xml HadoopDB.xml
  5. 建立資料表、測試資料匯入各台機器的資料庫中,並且在 Hive 上建立相對應的資料表
    • 在此以 raw 這個 talbe 當作範例。假設 HadoopDB.xml 對 raw 這個 table 敘述有 3 個,即上述範例的 udb0 、udb1 和 udb2,那就要分別去上頭指定的機器上建立資料庫
      • hadoop@Cluster01:~$ createdb udb0
        hadoop@Cluster02:~$ createdb udb1
        hadoop@Cluster03:~$ createdb udb2
    • 並且依輸入的資料建立資料表
      • hadoop@Cluster01:~$ psql udb0
        udb0=#
        CREATE TABLE raw (
        ID int,
        NAME varchar(300)
        );
      • 同理如 Cluster02 跟 Cluster03
    • 資料匯入
      • hadoop@Cluster01:~$ psql udb0
        udb0=# COPY RAW FROM '/home/hadoop/p0' WITH DELIMITER E'\t' ;
      • 關於 /home/hadoop/p0 的資料主要從原本依開始的大檔案,使用 HadoopDB 所提供的切割工具處理的
        • $ hadoop jar lib/hadoopdb.jar edu.yale.cs.hadoopdb.dataloader.GlobalHasher src_in_hdfs out_in_hdfs 3 '\n' 0
        • $ hadoop fs -get out_in_hdfs/part-00000 /home/hadoop/p0
      • 假設資料擺在 /home/haddop/p0 並且欄位以 tab 分隔
      • 同理也要處理 Cluster02 跟 Cluster03
    • 最後,在 Hive 上頭建立相對應的資料表 (只需用一台機器執行)
      • 假設 Hive 使用的資料表將儲存在 HDFS 的 /db
      • hadoop@Cluster01:~ $ hadoop dfs -mkdir /db
      • hadoop@Cluster01:~ $ SMS_dist/bin/hive
        CREATE EXTERNAL TABLE raw (
        ID int,
        NAME string
        )
        ROW FORMAT DELIMITED
        FIELDS TERMINATED BY '|'
        STORED AS
        INPUTFORMAT 'edu.yale.cs.hadoopdb.sms.connector.SMSInputFormat'
        OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
        LOCATION '/db/raw';
      • 其中 /db/raw 的 basename 要跟 table 名稱一樣(各台資料庫裡的資料表與Hive建立的資料表),另外對於資料欄位的型別也別忘了轉換囉
  6. 以上設定完後,即可在一台機器上(Ex: Cluster1) 執行 $ SMS_dist/bin/hive 看看成果
    • hadoop@Cluster01:~ $ SMS_dist/bin/hive
      hive> show tables;
      hive> select name from raw;

上述是採用 unchunked 當作範例,如果改用 chunked 模式,假設有三台機器,並且打算使用三個資料庫,那設定上就會變成 3 x 3 的數量,也就是共有 9 個資料庫要設定,包括建立資料庫、建立資料表,匯入資料等等的,因此實作上應該寫隻 script 處理,在 HadoopDB 網站上有一個建立 /my_data 目錄範例,我把它稍微改寫:

#!/usr/bin/python
import sys, os, thread, commands
import getopt  
       
DEBUG_MODE = True
                       
completed = {} 
create_db_cmd_list = [ ''
        , 'createdb testdb'
        , 'echo "CREATE TABLE Helo ( ID int );" | psql testdb'
        , 'dropdb testdb'
]              
                       
cmd_list = []
cmd_list.extend( create_db_cmd_list )

def ParseHadoopXML( file_path ) :
        return 
               
def executeThread(node, *args ):
        #Make sure key is accessible and is the correct key name.
        #os.system("ssh -i key -o 'StrictHostKeyChecking=no' %s \'mkdir /my_data \'" %(node))
        #You could replace the mkdir command with another command line or add more command lines,
        # as long as you prefix the command with the ssh connection.

        if DEBUG_MODE :
                print "\tShow Only"
        for cmd in cmd_list :
                if cmd == None or cmd == '' :
                        continue;
                cmd = cmd.strip()
                if cmd == '' :
                        continue;
                cmd_exec = "ssh %s \'%s\'" % (node , cmd )
                print "\t" , cmd_exec
                if DEBUG_MODE == False :
                        os.system( cmd_exec )
        completed[node] = "true"

def main( argv=None ):
        hostfile = "nodes.txt"
        internalips = open(hostfile,'r').readlines()

        for i in internalips:
                os.system('sleep 1')

                node_info = i.strip() ,
                thread.start_new_thread(executeThread, node_info  )

        while (len(completed.keys()) < len(internalips)):
                os.system('sleep 2')

        print "Execution Completed"

if __name__ == "__main__":
        main()

最後,我則是來個大改寫,若要使用十分建議先在用在虛擬環境,看看流程對不對,當然,最好是自己先手動設定過,等流程清楚後再設計符合自己的需求。使用上要先準備的資料:

  1. 將原始擺在 HDFS ,預設資料以 new line 分格,每筆資料以 Tab 分格欄位,例如:
    • 1    changyy
      2    hello world
      3    hadoop
  2. 在本機端建立 nodes.txt ,裡頭敘述 Cluster 各台機器的 IP,用 New Line('\n') 符號分格,例如:
    • 192.168.56.168
      192.168.56.169
      192.168.56.170
  3. 建立資料表的欄位敘述,預設此用 table_create 檔案,例如:
    • ID int ,
      NAME varchar(250)
  4. 最後,可以透過 $python this.py -help 查看有什麼可以設定,只不過是用很破的英文描述

純粹 show 出將會執行的指令,請留意它將會刪除哪些目錄、資料庫等等
$ python this.py --source_dir_in_hdfs src

真正運行
$ python this.py --source_dir_in_hdfs src -go

預設是 unchunked ,若想設定可以用 --chunk_num 設定
$ python this.py --source_dir_in_hdfs src --chunk_num 3

實際運行例子:

  • hadoop@Cluster01:~$ cat nodes.txt
    192.168.56.168
    192.168.56.169
    192.168.56.170
  • hadoop@Cluster01:~$ cat table_create
    ID int,
    NAME varchar(250)
  • hadoop@Cluster01:~$ python batch_setup.py --source_dir_in_hdfs src

     Current Status is just Debug Mode for show all commands
     please set '-g' or '--go' option to execute them after check all commands.(look at the 'rm -rf' and 'hadoop fs -rmr')

    $ /usr/bin/java -cp /home/hadoop/lib/hadoopdb.jar edu.yale.cs.hadoopdb.catalog.SimpleCatalogGenerator /tmp/Catalog.properties

    => Start to put the HadoopDB.xml into HDFS

    $ /home/hadoop/bin/hadoop fs -rmr HadoopDB.xml
    $ /home/hadoop/bin/hadoop fs -put HadoopDB.xml HadoopDB.xml

    => The data source(src) would be partitioned into 3 parts(tmp_out_hadoopdb) by the delimiter (\n)

    $ /home/hadoop/bin/hadoop fs -rmr tmp_out_hadoopdb
    $ /home/hadoop/bin/hadoop jar /home/hadoop/lib/hadoopdb.jar edu.yale.cs.hadoopdb.dataloader.GlobalHasher src tmp_out_hadoopdb 3 '\n' 0

    => To configure your nodes...

        ssh 192.168.56.168 "dropdb udb_hadoopdb_0"
        ssh 192.168.56.168 "createdb udb_hadoopdb_0"
        ssh 192.168.56.168 "echo \"create table hadoopdb ( id int, name varchar(250) );\" | psql udb_hadoopdb_0"
        ssh 192.168.56.168 "rm -rf /tmp/out_for_global_parition"
        ssh 192.168.56.168 "/home/hadoop/bin/hadoop fs -get tmp_out_hadoopdb/part-00000 /tmp/out_for_global_parition"
        ssh 192.168.56.168 "echo \"COPY hadoopdb FROM '/tmp/out_for_global_parition' WITH DELIMITER E'\t';\" | psql udb_hadoopdb_0"
        ssh 192.168.56.168 "rm -rf /tmp/out_for_global_parition"
        ssh 192.168.56.170 "dropdb udb_hadoopdb_2"
        ssh 192.168.56.170 "createdb udb_hadoopdb_2"
        ssh 192.168.56.170 "echo \"create table hadoopdb ( id int, name varchar(250) );\" | psql udb_hadoopdb_2"
        ssh 192.168.56.170 "rm -rf /tmp/out_for_global_parition"
        ssh 192.168.56.170 "/home/hadoop/bin/hadoop fs -get tmp_out_hadoopdb/part-00002 /tmp/out_for_global_parition"
        ssh 192.168.56.170 "echo \"COPY hadoopdb FROM '/tmp/out_for_global_parition' WITH DELIMITER E'\t';\" | psql udb_hadoopdb_2"
        ssh 192.168.56.170 "rm -rf /tmp/out_for_global_parition"
        ssh 192.168.56.169 "dropdb udb_hadoopdb_1"
        ssh 192.168.56.169 "createdb udb_hadoopdb_1"
        ssh 192.168.56.169 "echo \"create table hadoopdb ( id int, name varchar(250) );\" | psql udb_hadoopdb_1"
        ssh 192.168.56.169 "rm -rf /tmp/out_for_global_parition"
        ssh 192.168.56.169 "/home/hadoop/bin/hadoop fs -get tmp_out_hadoopdb/part-00001 /tmp/out_for_global_parition"
        ssh 192.168.56.169 "echo \"COPY hadoopdb FROM '/tmp/out_for_global_parition' WITH DELIMITER E'\t';\" | psql udb_hadoopdb_1"
        ssh 192.168.56.169 "rm -rf /tmp/out_for_global_parition"
    $ /home/hadoop/bin/hadoop fs -rmr tmp_out_hadoopdb

    => To setup the external table for Hive

    $ /home/hadoop/bin/hadoop fs -mkdir /db
    $ /home/hadoop/bin/hadoop fs -rmr /db/hadoopdb
    $  echo "drop table hadoopdb;" | /home/hadoop/SMS_dist/bin/hive
    $  echo "create external table hadoopdb ( id int, name string )  ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS  INPUTFORMAT 'edu.yale.cs.hadoopdb.sms.connector.SMSInputFormat'  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'  LOCATION '/db/hadoopdb'; " | /home/hadoop/SMS_dist/bin/hive


    => All Execution Completed...

#!/usr/bin/python
# At Python 2.6.4
# Yuan-Yi Chang
# 2010/01/07 15:09
#
import sys, os, thread, commands
import re, os.path
from optparse import OptionParser

BIN_JAVA = '/usr/bin/java'
BIN_HADOOP = '/home/hadoop/bin/hadoop'
BIN_HIVE = '/home/hadoop/SMS_dist/bin/hive'
JAR_HADOOPDB = '/home/hadoop/lib/hadoopdb.jar'

completed = {}
cmd_for_node = {}

def initHadoopDB(    data_in_hdfs = None , data_delimiter = '\n' , data_field_delimiter = '\t' ,
            data_partition_out = None ,
            nodes_in_file = 'nodes.txt' , chunks_per_node = 3 ,
            table_name = None , table_field_info = None ,
            db_user = 'hadoop' , db_pass='1234' , db_field_delimiter = '|' , hive_db_dir_in_hdfs = '/db' ,
            tmp_path_for_catelog = '/tmp/Catalog.properties' , out_hadoop_xml = 'HadoopDB.xml' , hadoop_xml_in_hdfs = 'HadoopDB.xml' ,
            DEBUG_MODE = True ) :
    if data_in_hdfs is None :
        print 'Please input the path of the data source in HDFS'
        return False
    if data_partition_out is None :
        print 'Please input the path for the data source parition in HDFS'
        return False
    if table_name is None or re.match( '/[a-z0-9_]+/'  , table_name ) :
        print 'Please input the table name with [a-z0-9_] only'
        return False
    if table_field_info is None or os.path.isfile( table_field_info ) is False :
        print 'Please check the "table_field_info" : ' + str(table_field_info)
        return False
        
    if os.path.isfile( nodes_in_file ) is False :
        print 'Please check the "nodes_in_file" : ' + nodes_in_file
        return False
    if chunks_per_node < 0 :
        print 'Please check the "chunks_per_node" : ' + chunks_per_node + ' , 0 for no chunk'
        return False

    data_delimiter = data_delimiter.replace( '\n' , '\\n' ).replace( '\t' , '\\t' )
    data_field_delimiter = data_field_delimiter.replace( '\t' , '\\t' ).replace( '\n' , '\\n' )
    db_field_delimiter = db_field_delimiter.replace( '\t' , '\\t' ).replace( '\n' , '\\n' )

    make_catelog = ''
    #Properties for Catalog Generation'
    ##################################
    make_catelog += 'nodes_file='+nodes_in_file+'\n'
    if chunks_per_node < 2 :
        make_catelog += 'relations_chunked=no_use' + '\n'
        make_catelog += 'relations_unchunked='+table_name + '\n'
    else:
        make_catelog += 'relations_unchunked=' + 'no_use' + '\n'
        make_catelog += 'relations_chunked='+table_name + '\n'
    make_catelog += 'catalog_file=' + out_hadoop_xml + '\n'
    ##
    #DB Connection Parameters
    ##
    make_catelog += 'port=5432' + '\n'
    make_catelog += 'username=' + db_user + '\n'
    make_catelog += 'password=' + db_pass + '\n'
    make_catelog += 'driver=org.postgresql.Driver' + '\n'
    make_catelog += 'url_prefix=jdbc\\:postgresql\\://'+ '\n'
    ##
    #Chunking properties
    ##
    make_catelog += 'chunks_per_node=' + str(chunks_per_node) + '\n'
    make_catelog += 'unchunked_db_prefix=udb_' + table_name + '_' + '\n'
    make_catelog += 'chunked_db_prefix=cdb_'+ table_name + '_' + '\n'
    ##
    #Replication Properties
    ##
    make_catelog += 'dump_script_prefix=/root/dump' + '\n'
    make_catelog += 'replication_script_prefix=/root/load_replica_' + '\n'
    make_catelog += 'dump_file_u_prefix=/mnt/dump_udb' + '\n'
    make_catelog += 'dump_file_c_prefix=/mnt/dump_cdb'+ '\n'
    ##
    #Cluster Connection
    ##
    make_catelog += 'ssh_key=id_rsa-gsg-keypair' + '\n'
    
    try:
        f = open( tmp_path_for_catelog , 'w' )
        f.write( make_catelog )
        f.close()
    except:
        print 'Error to write a catelog:'+tmp_path_for_catelog
        return False
    cmd_exec = BIN_JAVA + ' -cp ' + JAR_HADOOPDB + ' edu.yale.cs.hadoopdb.catalog.SimpleCatalogGenerator ' + tmp_path_for_catelog
    if DEBUG_MODE :
        print '$ ' + cmd_exec
    else:
        os.system( cmd_exec )

    if os.path.isfile( out_hadoop_xml ) is False :
        print 'Please check the "out_hadoop_xml" : ' + out_hadoop_xml
        return False

    print '\n=> Start to put the HadoopDB.xml into HDFS\n'

    if DEBUG_MODE :
        print '$ ' + BIN_HADOOP + ' fs -rmr ' + hadoop_xml_in_hdfs
        print '$ ' + BIN_HADOOP + ' fs -put ' + out_hadoop_xml + ' ' + hadoop_xml_in_hdfs
    else:
        os.system( BIN_HADOOP + ' fs -rmr ' + hadoop_xml_in_hdfs )
        os.system( BIN_HADOOP + ' fs -put ' + out_hadoop_xml + ' ' + hadoop_xml_in_hdfs  )

    partition_num = 0
    node_list = []
    try:
        tmp_list = open( nodes_in_file ,'r').readlines()
        for line in tmp_list :
            line = line.strip()
            if line <> '' :
                node_list.append( line )
        partition_num = len( node_list )
    except:
        print 'Please check the "nodes_in_file" : ' + nodes_in_file
        return False

    if partition_num > 1 :
        cmd_exec = BIN_HADOOP + ' jar ' + JAR_HADOOPDB + ' edu.yale.cs.hadoopdb.dataloader.GlobalHasher ' + data_in_hdfs + ' ' + data_partition_out + ' ' + str(partition_num) + ' \'' + data_delimiter + '\' 0 '

        print '\n=> The data source('+data_in_hdfs+') would be partitioned into '+str(partition_num)+' parts('+data_partition_out+') by the delimiter ('+data_delimiter+')\n'
        if DEBUG_MODE :
            print '$ ' + BIN_HADOOP + ' fs -rmr ' + data_partition_out
            print '$ ' + cmd_exec
        else:
            os.system( BIN_HADOOP + ' fs -rmr ' + data_partition_out )
            os.system( cmd_exec )
    else:
        print '\n=> The number of datanodes should be > 1\n'
        return False

    HadoopDB_Info = ''
    try:
        HadoopDB_Info = open( out_hadoop_xml , 'r' ).read()
    except:
        print 'Error at read "out_hadoop_xml" : ' + out_hadoop_xml
        return False
    if HadoopDB_Info is '' :
        print 'The info in the file is empty : ' + HadoopDB_Info
        return False

    DB_TABLE_CREATE_INFO = ''
    try:
        DB_TABLE_CREATE_INFO = open( table_field_info , 'r' ).read().strip()
    except:
        print 'Error at read "table_field_info" : ' + table_field_info
        return False

    if DB_TABLE_CREATE_INFO is '' :
        print 'The info in the file is empty : ' + DB_TABLE_CREATE_INFO
        return False
    DB_TABLE_CREATE_INFO = DB_TABLE_CREATE_INFO.replace( "\n" , ' ' ).replace( '"' , '\\"' ).lower()
    DB_TABLE_CREATE_INFO = 'create table ' + table_name + ' ( ' + DB_TABLE_CREATE_INFO + ' );'

    #print node_list
    partition_index = 0
    for node in node_list:
        cmd_for_node[ node ] = []
        if chunks_per_node is 0 :    # use unchunked mode
            db_list = re.findall( '' + node +':[\d]+/(udb_' + table_name + '_'+'[\w]+)' , HadoopDB_Info )
            for sub_db in db_list :
                # Create Database & Table
                cmd_for_node[ node ].append( 'dropdb ' + sub_db )
                cmd_for_node[ node ].append( 'createdb ' + sub_db )
                cmd_for_node[ node ].append( 'echo "'+DB_TABLE_CREATE_INFO+'" | psql '+ sub_db )
                cmd_for_node[ node ].append( 'rm -rf /tmp/out_for_global_parition' )
                cmd_for_node[ node ].append( BIN_HADOOP + ' fs -get ' + data_partition_out + '/part-%0.5d /tmp/out_for_global_parition' % partition_index )
                cmd_for_node[ node ].append( 'echo "COPY '+table_name+' FROM \'/tmp/out_for_global_parition\' WITH DELIMITER E\''+data_field_delimiter+'\';" | psql '+ sub_db )
                cmd_for_node[ node ].append( 'rm -rf /tmp/out_for_global_parition' )
        else:
            db_list = re.findall( '' + node +':[\d]+/(cdb_' + table_name + '_'+'[\w]+)' , HadoopDB_Info )
            if db_list <> None :
                cmd_for_node[ node ].append( 'rm -rf /tmp/*out_for_global_parition' )
                cmd_for_node[ node ].append( BIN_HADOOP + ' fs -get ' + data_partition_out + '/part-%0.5d /tmp/out_for_global_parition' % partition_index )
                cmd_for_node[ node ].append( 'cd /tmp; ' + BIN_JAVA + ' -cp ' + JAR_HADOOPDB + ' edu.yale.cs.hadoopdb.dataloader.LocalHasher out_for_global_parition ' + str( chunks_per_node ) + ' \'' + data_delimiter + '\' 0 ' )
                sub_part = 0
                for sub_db in db_list :
                    # Create Database & Table
                    cmd_for_node[ node ].append( 'dropdb ' + sub_db )
                    cmd_for_node[ node ].append( 'createdb ' + sub_db )
                    cmd_for_node[ node ].append( 'echo "'+DB_TABLE_CREATE_INFO+'" | psql '+ sub_db )
                    cmd_for_node[ node ].append( 'echo "COPY '+table_name+' FROM \'/tmp/'+str(sub_part)+'-out_for_global_parition\' WITH DELIMITER E\''+data_field_delimiter+'\';" | psql '+ sub_db )

                    sub_part = sub_part + 1
                    #cmd_for_node[ node ].append( 'rm -rf /tmp/'+str(sub_part)+'-out_for_global_parition' )
                
                cmd_for_node[ node ].append( 'rm -rf /tmp/*out_for_global_parition' )

        partition_index = partition_index + 1
    
    print '\n=> To configure your nodes...\n'

    for node in node_list:
        thread.start_new_thread( executeThreadForNode , ( node, DEBUG_MODE ) )

    while (len(completed.keys()) < len(node_list) ) :
        os.system('sleep 2')

    if DEBUG_MODE :
        print '$ ' + BIN_HADOOP + ' fs -rmr ' + data_partition_out
    else:
        os.system( BIN_HADOOP + ' fs -rmr ' + data_partition_out )

    print '\n=> To setup the external table for Hive\n'
    if DEBUG_MODE :
        print '$ ' + BIN_HADOOP + ' fs -mkdir ' + hive_db_dir_in_hdfs
        print '$ ' + BIN_HADOOP + ' fs -rmr ' + hive_db_dir_in_hdfs + '/' + table_name
    else:
        os.system( BIN_HADOOP + ' fs -mkdir ' + hive_db_dir_in_hdfs )
        os.system( BIN_HADOOP + ' fs -rmr ' + hive_db_dir_in_hdfs + '/' + table_name  )
    
    cmd_exec = ' echo "drop table '+table_name+';" | ' + BIN_HIVE
    if DEBUG_MODE :
        print '$ ' + cmd_exec
    else:
        os.system( cmd_exec )

    create_hive_external_table =     ' ROW FORMAT DELIMITED FIELDS TERMINATED BY \'' + db_field_delimiter + '\''
    create_hive_external_table +=    ' STORED AS '
    create_hive_external_table +=    ' INPUTFORMAT \'edu.yale.cs.hadoopdb.sms.connector.SMSInputFormat\' '
    create_hive_external_table +=    ' OUTPUTFORMAT \'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\' '
    create_hive_external_table +=    ' LOCATION \'' + hive_db_dir_in_hdfs + '/' + table_name + '\'; '

    DB_TABLE_CREATE_INFO = DB_TABLE_CREATE_INFO.replace( ";" , ' ' ).replace( 'precision' , '' ).replace( 'create table' , 'create external table' )
    DB_TABLE_CREATE_INFO = re.sub( 'varchar\([\d]+\)|text' , 'string' , DB_TABLE_CREATE_INFO )
    create_hive_external_table = DB_TABLE_CREATE_INFO + create_hive_external_table

    cmd_exec = ' echo "'+create_hive_external_table+'" | ' + BIN_HIVE
    if DEBUG_MODE :
        print '$ ' + cmd_exec
    else:
        os.system( cmd_exec )

def executeThreadForNode(node,DEBUG_MODE=True, *args ):
    for cmd in cmd_for_node[node] :
        if cmd == None or cmd == '' :
            continue;
        cmd = cmd.strip()
        if cmd == '' :
            continue;
        cmd = cmd.replace( '"' , '\\"' )
        cmd_exec = "ssh %s \"%s\"" % (node , cmd )
        print "\t" , cmd_exec
        if DEBUG_MODE == False :
            os.system( cmd_exec )
    completed[node] = "true"

def main( argv=None ):

    parser = OptionParser()
    parser.add_option( "-H" , "--source_dir_in_hdfs" , dest="source_dir_in_hdfs" , default=None, help="dir for data source in HDFS" )
    parser.add_option( "-D" , "--source_data_delimiter" , dest="source_data_delimiter" , default='\n' , help="record delimtier for the source" )
    parser.add_option( "-F" , "--source_field_delimiter" , dest="source_field_delimiter" , default='\t' , help="field delimiter for a record" )
    parser.add_option( "-P" , "--source_partition_dir" , dest="source_partition_dir" , default="tmp_out_hadoopdb" , help="temp dir in HDFS for source partition" )
    parser.add_option( "-N" , "--node_list_file" , dest="node_list_file" , default="nodes.txt" , help="path for a file saved each node's IP address" )
    parser.add_option( "-c" , "--chunk_num" , dest="chunk_num" , default=0 , help="number of databases for each node" )
    parser.add_option( "-t" , "--table_name" , dest="table_name" , default="hadoopdb" , help="table name for creation on Hive and databases" )
    parser.add_option( "-i" , "--table_field_info_file" , dest="table_field_info_file" , default="table_create", help="file for table field definition only" )
    parser.add_option( "-u" , "--db_username" , dest="db_username" , default="hadoop" , help="username for login the databases on each node" )
    parser.add_option( "-p" , "--db_password" , dest="db_password" , default="1234" , help="password for login the databases on each node" )
    parser.add_option( "-d" , "--db_field_delimiter" , dest="db_field_delimiter" , default="|" , help="field delimiter for the databases" )
    parser.add_option( "-w" , "--hive_db_dir" , dest="hive_db_dir" , default='/db' , help="path in HDFS for Hive to save the tables" )
    parser.add_option( "-f" , "--catalog_properties" , dest="catalog_properties" , default='/tmp/Catalog.properties' , help="output file for Catalog.Properties" )
    parser.add_option( "-x" , "--hadoopdb_xml" , dest="hadoopdb_xml" , default="HadoopDB.xml" , help="output file for HadoopDB.xml" )
    parser.add_option( "-y" , "--hadoopdb_xml_in_hdfs" , dest="hadoopdb_xml_in_hdfs" , default="HadoopDB.xml" , help="filename for HadoopDB.xml in HDFS" )
    parser.add_option( "-g" , "--go" , action="store_false" , dest="mode" , default=True , help="set it to execute the commands" )

    ( options, args ) = parser.parse_args()
    #print options
    #return    

    #initHadoopDB(    data_in_hdfs='src' , data_partition_out='tmp_out' , table_name='justtest' , table_field_info='table_create'  )

    if options.source_dir_in_hdfs is None :
        print "Please input the source dir in HDFS by '--source_dir_in_hdfs' "
        return
    if os.path.isfile( options.node_list_file ) is False :
        print "Please check the '" + options.node_list_file + "' path and setup by '--node_list_file'"

    if options.mode is True :
        print "\n Current Status is just Debug Mode for show all commands\n please set '-g' or '--go' option to execute them after check all commands.(look at the 'rm -rf' and 'hadoop fs -rmr')\n"
    
    initHadoopDB( data_in_hdfs = options.source_dir_in_hdfs, data_delimiter = options.source_data_delimiter, data_field_delimiter = options.source_field_delimiter,data_partition_out = options.source_partition_dir, nodes_in_file = options.node_list_file, chunks_per_node = options.chunk_num, table_name = options.table_name, table_field_info = options.table_field_info_file, db_user = options.db_username, db_pass = options.db_password, db_field_delimiter = options.db_field_delimiter, hive_db_dir_in_hdfs = options.hive_db_dir, tmp_path_for_catelog = options.catalog_properties, out_hadoop_xml = options.hadoopdb_xml, hadoop_xml_in_hdfs = options.hadoopdb_xml_in_hdfs, DEBUG_MODE = options.mode )

    print "\n\n=> All Execution Completed..."

if __name__ == "__main__":
    main()

另外有一些常見問題也順便紀錄:

hadoop@Cluster01:~$ echo "drop table justest;" | /home/hadoop/SMS_dist/bin/hive
Hive history file=/tmp/hadoop/hive_job_log_hadoop_201001081445_409015456.txt
hive> drop table justest;
FAILED: Error in metadata: javax.jdo.JDOFatalDataStoreException: Failed to start database 'metastore_db', see the next exception for details.
NestedThrowables:
java.sql.SQLException: Failed to start database 'metastore_db', see the next exception for details.
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask
hive>

這類訊息是因為同時有兩個 client 在用 hive,同一時間只能有一個 client 在操作。

hadoop@Cluster01:~$ sudo apt-get install postgresql
hadoop@Cluster01:~$ sudo vim /etc/postgresql/8.4/main/pg_hba.conf
#local   all         all                               ident
local   all         all                               trust
# IPv4 local connections:
#host    all         all         127.0.0.1/32          md5
host    all         all         127.0.0.1/32          password
host    all         all         10.0.0.1/8          password            # 加上Cluster機器上的範圍
# IPv6 local connections:
#host    all         all         ::1/128               md5
host    all         all         ::1/128               password
hadoop@Cluster01:~$ sudo vim /etc/postgresql/8.4/main/postgresql.conf
listen_addresses = '*'
hadoop@Cluster01:~$ sudo /etc/init.d/postgresql-8.4 restart
hadoop@Cluster01:~$ sudo su - postgres
postgres@Cluster01:~$ createuser hadoop
Shall the new role be a superuser? (y/n) y
postgres@Cluster01:~$ psql
psql (8.4.2)
Type "help" for help.

postgres=# alter user hadoop with password '1234';
ALTER ROLE
postgres=# \q
postgres@Cluster01:~$ exit
logout

changyy 發表在 痞客邦 PIXNET 留言(3) 人氣()


留言列表 (3)

發表留言
  • 曼森
  • Hive history file=/tmp/hadoop/hive_job_log_hadoop_201001081445_409015456.txt

    有沒有甚解決的辦法
  • sorry, 有一陣子沒玩了, 我看不懂你的問題 Orz

    changyy 於 2010/05/27 10:00 回覆

  • manson
  • hadoop@Cluster01:~ $ SMS_dist/bin/hive
    ###########################
    輸入以上指令後會出現者個訊息
    Hive history file=/tmp/hadoop/hive_job_log_hadoop_201005271048_1660712084.txt
    Hive>

    請問這個訊息是有錯誤嗎? 該怎麼解決?
    ###########################
    CREATE EXTERNAL TABLE raw (
    ID int,
    NAME string
    )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '|'
    STORED AS
    INPUTFORMAT 'edu.yale.cs.hadoopdb.sms.connector.SMSInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION '/db/raw';
  • 哦哦, 應該沒問題吧, 這應該只是一則訊息, 類似將接下來的工作指令會被記錄在某個檔案內, 出錯應該是會顯示 "FAILED:" 開頭或其他類似的字樣, 不過我也很久沒玩了, 手邊以沒環境可以跑, 所以無法真的幫你確認囉

    最後一提的, 這個 HadoopDB 是跟 Hive 整在一起使用, 所以有問題可能很難去 debug, 通常會稍微先搞懂 Hive 到底在做啥, 它只是提供一個架構幫你把 SQL 指令轉成 Hadoop 的 Jobs 去跑, 而 HadoopDB 其實只是用用 Hive 的架構, 設法把讀取底部資料的地方改成用 database 囉

    因此, 若設定沒弄好, 很容易會出現問題, 建議你可以先試試 Hive 來跑幾個 jobs, 最後再套上 HadoopDB 玩玩囉

    祝你使用上一切順利啦

    changyy 於 2010/05/27 11:08 回覆

  • 曼森
  • 資料匯入

    * hadoop@Cluster01:~$ psql udb0
    udb0=# COPY RAW FROM '/home/hadoop/p0' WITH DELIMITER E'\t' ;
    * 關於 /home/hadoop/p0 的資料主要從原本依開始的大檔案,使用 HadoopDB 所提供的切割工具處理的
    o $ hadoop jar lib/hadoopdb.jar edu.yale.cs.hadoopdb.dataloader.GlobalHasher src_in_hdfs out_in_hdfs 3 '\n' 0
    o $ hadoop fs -get out_in_hdfs/part-00000 /home/hadoop/p0
    * 假設資料擺在 /home/haddop/p0 並且欄位以 tab 分隔
    * 同理也要處理 Cluster02 跟 Cluster03
    ###################
    src_in_hdfs 這個目錄下面有資料嗎?
    我用的版本是hadoop-0.20.1 所以這邊不太清楚
    src_in_hdfs 我是自己-mkdir src_in_hdfs
    切出來之後裡面的資料有
    out_in_hdfs/part-0000
    out_in_hdfs/part-0001
    out_in_hdfs/part-0002
    裡面都是空的
    是不是版本不同的關係?
  • 當初我用 Hadoop 0.19.2 的關係, 主要也是因為 HadoopDB 是用這個版本.

    而 Hadoop 0.20.x 跟 Hadoop 0.19.x 裡頭的確有些程式架構有變, 所以有可能 HadoopDB 裡頭寫的程式不能在 0.20.1 上頭正常運作, 因此, 也有可能是版本的問題, 你可以去留意看看 HadoopDB 有沒再更新, 不然我是比較建議用 Hadoop 0.19.x 囉!

    不過這樣還會有很多問題, 若是以 Hadoop 的使用上, 比較建議是用新版的來開發, 因為 0.19.x 跟 0.20.x 有所變動, 一直用 0.19.x 會跟新版開發越離越遠 orz

    changyy 於 2010/05/27 13:37 回覆