使用到的包:xml.sax 文档

代码托管位置 github-pytools

需求

读取xml数据文件,文件较大,需要实时处理插入到数据库

xml文档

<PERSONS>
<person>
    <id>100000</id>
    <sex></sex>
    <address>北京,海淀区</address>
    <fansNum>437</fansNum>
    <summary>1989</summary>
    <wbNum>333</wbNum>
    <gzNum>242</gzNum>
    <blog>null</blog>
    <edu>大学</edu>
    <work></work>
    <renZh>1</renZh>
    <brithday>2月14日</brithday>
</person>
</PERSONS>

处理

sax处理时并不会像dom一样可以以类似节点的维度进行读取,它只有 开始标签 内容 结束标签 之分

处理思想是:通过一个handler,对开始标签,内容,结束标签各有一个处理函数

代码及注解

person 处理类

from xml.sax import handler,parseString
class PersonHandler(handler.ContentHandler):
  def __init__(self, db_ops):
    #db op obj
    self.db_ops = db_ops
    #存储一个person的map
    self.person = {}
    #当前的tag
    self.current_tag = ""
    #是否是tag之间的内容 ,目的拿到tag间内容,不受空白的干扰
    self.in_quote = 0
  #开始,清空map
  def startElement(self, name, attr):
    #以person,清空map
    if name == "person":
      self.person = {}
    #记录 状态
    self.current_tag = name
    self.in_quote = 1
  #结束,插入数据库
  def endElement(self, name):
    #以person结尾  代表读取一个person的信息结束
    if name == "person":
      #do something
      in_fields = tuple([ ('"' + self.person.get(i,"") + '"')  for i in fields ])
      print in_sql % in_fields
      db_ops.insert( in_sql%(in_fields))
    #处理
    self.in_quote = 0
  def characters(self, content):
    #若是在tag之间的内容,更新到map中
    if self.in_quote:
      self.person.update({self.current_tag: content})

加上入库的完整代码

#!/usr/bin/python
# -*- coding:utf-8 -*-
#parse_person.py
#version : 0.1
#author : wukunliang@163.com
#desc : parse person.xml and out sql


import sys,os
import MySQLdb

reload(sys)
sys.setdefaultencoding('utf-8')

in_sql = "insert into person(id,sex,address,fansNum,summary,wbNum,gzNum,blog,edu,work,renZh,brithday) values(%s, %s, %s, %s, %s, %s,
          %s, %s, %s, %s, %s, %s)"

fields = ("id","sex","address","fansNum","summary","wbNum","gzNum","blog","edu","work","renZh","brithday")

#数据库方法
class Db_Connect:
    def __init__(self, db_host, user, pwd, db_name, charset="utf8",  use_unicode = True):
        print "init begin"
        print db_host, user, pwd, db_name, charset , use_unicode
        self.conn = MySQLdb.Connection(db_host, user, pwd, db_name, charset=charset , use_unicode=use_unicode)
        print "init end"

    def insert(self, sql):
        try:
            n = self.conn.cursor().execute(sql)
            return n
        except MySQLdb.Warning, e:
            print "Error: execute sql '",sql,"' failed"

    def close(self):
        self.conn.close()

#person 处理类
from xml.sax import handler,parseString
class PersonHandler(handler.ContentHandler):
    def __init__(self, db_ops):
        #db op obj
        self.db_ops = db_ops
        #存储一个person的map
        self.person = {}
        #当前的tag
        self.current_tag = ""
        #是否是tag之间的内容
        self.in_quote = 0
    #开始,清空map
    def startElement(self, name, attr):
        #以person,清空map
        if name == "person":
          self.person = {}
        #记录 状态
        self.current_tag = name
        self.in_quote = 1
    #结束,插入数据库
    def endElement(self, name):
        #以person结尾  代表读取一个person的信息结束
        if name == "person":
            #do something
            in_fields = tuple([ ('"' + self.person.get(i,"") + '"')  for i in fields ])
            print in_sql % in_fields
            db_ops.insert( in_sql%(in_fields))
        #处理
        self.in_quote = 0
    def characters(self, content):
        #若是在tag之间的内容,更新到map中
        if self.in_quote:
            self.person.update({self.current_tag: content})

if __name__ == "__main__":
    f = open("./person.xml")
    #如果源文件gbk  转码      若是utf-8,去掉decode.encode
    db_ops = Db_Connect("127.0.0.1", "root", "root", "test")
    parseString(f.read().decode("gbk").encode("utf-8"), PersonHandler(db_ops))
    f.close()
    db_ops.close()

平时拿python来分析数据,工具脚本还有hadoop streamming,但是用的面和深度实在欠缺
只能说道行还浅,需要多多实践

The end!

2012-04-07