分类 Python 下的文章

Python中staticmethod方法和classmethod方法区别

前言

staticmethod和classmethod两个方法在python里是通过装饰器来实现的,语法分别是@staticmethod和@classmethod,本文就讨论下这两种方法的区别以及使用场景

定义方式差异

@classmethod和@staticmethod装饰方法时,对于被装饰方法本身定义有差异,主要体现在形参上。

@classmethod
#第一个参数是类本身
def class_method(cls, data):

@staticmethod
#不存在任何与类、实例相关的参数,包括cls、self
def static_method(data):

- 阅读剩余部分 -

Python re模块

re 模块

match

import re
example = "Hello World, I am 24 years old."
pattern = re.compile(r"(\S+) (\S+) (\S*) (\S+) (\d+) (\S+) (\S+)")
m = pattern.match(example)
print m.group(0)    # Hello World, I am 24 years old.
print m.group(1)    # Hello
print m.group(2)    # World

or

m = re.match(r'(\S+) (\S+) (\S*) (\S+) (\d+) (\S+) (\S+)', example)
print m.group()

- 阅读剩余部分 -

Python下unicode和string编码

问题

#!/usr/bin/python
# -*- coding: utf-8 -*-

s = "你好"
s.encode("utf-8")

运行结果报错

UnicodeDecodeError: 'ascii' codec can't decode byte 0xe4 in position 0: ordinal not in range(128)

报错原因

在python里面,编码和解码的关系如下:

unicode -> string 编码
string -> unicode 解码

上面的s变量其实是string类型,如果尝试对s进行encode,那么必须对s变量进行解码成unicode,然后再编码成string。但是在s变量解码的过程中,python会根据系统默认的解码方式进行解码,根据报错可以看出是python默认的解码方式是"ascii",sys.getdefaultencoding()可以查看。但是因为文件开头指定了s变量又是utf-8的编码方式,所以就冲突了,做法就是在编码之前指定"utf-8"进行解码。

如何判断某个字符串的编码?

isinstance(s, str) 用来判断是否为一般字符串

isinstance(s, unicode) 用来判断是否为unicode
#!/usr/bin/python
# -*- coding: utf-8 -*-

string = "你好"
string.decode('utf-8').encode("utf-8")

还有一种方式就是修改python默认编码解码方式

import sys
reload(sys)
sys.setdefaultencoding('utf-8')

Python Gevent应用

Gevent介绍

看了下gevent,最好的资料还是gevent程序员指南,亲自用了下gevent,因为是第一次真正接触coroutine,所以还是有点抽象,碰到了一些问题。

关于coroutine

淺談coroutine與gevent

gevent应用

场景:有100个url,要对这100个url并发发起请求,通过个url丢进去,然后执行完成后,100个url请求的结果独立返回给我,如果是true那么就是返回请求的结果,如果是false,那么返回错误的原因。

- 阅读剩余部分 -

Python参数解析optparse

# *-* coding: UTF-8 *-*

import sys
from optparse import OptionParser

def option_parser():
    parser = OptionParser("%prog [options] arg",version="%prog 1.0")
   	
   	# store_true|store_false 表示 -v|-q 后面没有跟参数,指定-v,则verbose=True;指定-q,verbose=False
	parser.add_option("-v", action="store_true", dest="verbose")  
	parser.add_option("-q", action="store_false", dest="verbose") 

	# default 指定变量默认值,metavar
    parser.add_option("-D", "--directory", dest="file_path",metavar='FILE_PATH',
                        help="file directory .",default="/home/firefoxbug"),
                
    # action=append,可以指定多项参数+值     
    parser.add_option("-H", "--host",dest="mysql_host",metavar='MYSQL_HOST',
   						action="append", help="mysql ip(must)")
    
    # type 指定参数类型
    parser.add_option("-P", "--port",dest="mysql_port", type=int, help='mysql port .(default:8080)',default=80)

    (options, args) = parser.parse_args()
    print parser.print_help()
    if len(sys.argv) == 1:
            parser.error("incorrect number of arguments")
    return (parser,options, args)

action参数说明

action:
	store: 默认的设置,表示保存参数,比如 -d abc,就把 abc 保存到options变量里
	store_true|store_false: 可以不指定参数值,比如 -v,后面可以不跟值,store_true/store_false分别代表bool值
	append: 追加形式保存参数,比如 -d abc -d edf,得到的结果就是['abc', 'edf']

- 阅读剩余部分 -

Python常用代码笔记

参数解析

import sys
import getopt

try:
    opts,args = getopt.getopt(sys.argv[1:], "ho:",["help", "output="]);
    if not opts:
        usage()
    for opt,arg in opts:
        if opt in ("-h", "--help"):
            usage();
        elif opt in ("-H",):
            mysql_host = arg
        elif opt in ("-o",):
            parse_cpu_csv(arg)
        else:
            usage()
            sys.exit(1)
except getopt.GetoptError:
    usage();
    sys.exit(1)

- 阅读剩余部分 -

python中的@装饰器

先看一段代码

#!/usr/bin/python

def test(func):
    func()

@test
def fun():
    print "call fun"

上面的代码没有main()调用或者直接的函数调用,结果还是会输出

call fun

@修饰符有点像函数指针,python解释器发现执行的时候如果碰到@修饰的函数,首先就解析它,找到它对应的函数进行调用,并且会把@修饰下面一行的函数作为一个函数指针传入它对应的函数。有点绕口,这里说的“它对应的函数”就是名字是一样的。下面说下之前代码的解析流程

  1. python解释器发现@test,就去调用test函数
  2. test函数调用预先要指定一个参数,传入的就是@test下面修饰的函数,也就是fun()
  3. test()函数执行,调用fun(),fun()打印“call fun”

再看一段代码

#!/usr/bin/python

def test(func):
    func()
    print "call test over"

def main():
    @test
    def fun():
        print "call fun"
#main()

这样调用的话就不会调用test,只有当main函数调用的时候才会进入到main函数,然后调用test。
来看看正规的用法


def t(func):
    print "in t"
    def _a():
        print "in a"
        return func()
    print "call t over"
    return _a

@t
def hello():
    print "hello"

hello()

输出

in t
call t over
in a
hello

运行流程

  1. 执行hello(),发现有@修饰符,把hello()作为函数指针传入@修饰的函数t()
  2. t()函数执行到return _a,这时候会去调用_a()
  3. _a()函数执行func(),这里的func()就是之前传入了hello()
  4. 执行hello()函数的打印

应用场景:测试函数运行时间

from time import clock

def get_run_time(func):
    def _a():
        start = clock()
        func()
        end = clock()
        print end-start
    return _a

@get_run_time
def hello():
    print "hello\n"

hello()

注意一定要写成这样闭包的形式

### 如何传参数

def t(func):
    print "in t"
    def _a(* args, ** kwds):
        print "in a"
        return func(* args, ** kwds)
    print "call t over"
    return _a

@t
def hello(v):
    print "hello", v

hello("world")

更多用法

[http://www.cnblogs.com/rhcad/archive/2011/12/21/2295507.html](http://www.cnblogs.com/rhcad/archive/2011/12/21/2295507.html)

python解析配置文件

项目中经常需要解析配置文件,最简单的就是有很多个section,然后每个section里面都是很多option,每一项option的组成都是key=value,怎么解析这个配置文件?python有内置的库可以解决,比如下面是一个配置文件的demo

#WatchDir 配置文件
#配置的value的时候除了第一项就先别加单引号或者双引号了

[monitor]
#监控路径
WatchPath='/var/log/'

#数据写到本地磁盘cache时间间隔,单位:S
Interval=10


#是否以守护进程方式运行
Daemon=False

#mysql配置
[mysql]
mysql_host = 127.0.0.1
mysql_username = root
mysql_password = 12345678
mysql_port = 3306
mysql_dbname = db_a
mysql_table = tb_b

- 阅读剩余部分 -

Python日志模块

日志的重要性不言而喻,之前写各种程序最纠结的就是这部分了,每次都自己定义日志格式,以后还是用标准的python日志模块吧。Python的日志类型也是Nginx和Syslog一样,是分等级的。 NOTSET < DEBUG < INFO < WARNING < ERROR < CRITICAL 如果把looger的级别设置为INFO, 那么小于INFO级别的日志都不输出, 大于等于INFO级别的日志都输出

- 阅读剩余部分 -

Python watchdog

想监控本地文件系统下的某个目录,原本是想用pyinotify的,谁知道mac的最新系统不支持,于是找了watchdog。根据官网的API做了一份和pyinotify的demo。

<pre>
#!/usr/bin/python
# -*- coding: UTF-8 -*-

# author : firefoxbug
# E-Mail : wanghuafire@gmail.com
# Blog   : www.firefoxbug.net

import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class MyHandler(FileSystemEventHandler):
	def on_created(self,event):
		if event.is_directory:
			print event.event_type,event.src_path
		else :
			print event.event_type,event.src_path

	def on_deleted(self,event):
		if event.is_directory:
			print event.event_type,event.src_path
		else :
			print event.event_type,event.src_path

	def on_modified(self,event):
		if not event.is_directory:
			print event.event_type,event.src_path

	def on_moved(self,event):
		print "move",event.src_path,event.dest_path

if __name__ == "__main__":
	event_handler = MyHandler()
	observer = Observer()
	observer.schedule(event_handler, path='.', recursive=True)
	observer.start()

	try:
		print "started myWatch"
		while True:
			time.sleep(1)
	except KeyboardInterrupt:
		observer.stop()
	observer.join()
</pre>

详细的可以参考 http://pythonhosted.org/watchdog/

Python redis操作

首先安装python redis扩展模块 yum install python-redis

#!/usr/bin/env python2.6
#-*- coding: utf-8 -*-

# Author : firefoxbug
# E-Mail : wanghuafire@gmail.com
# Blog   : www.firefoxbug.net
# Function : put or get log dictionary from redis-server

import redis  

global redis_q

class RedisQueue(object):
    """Simple Queue with Redis Backend"""
    def __init__(self,**redis_kwargs):
        """The default connection parameters are: host='localhost', port=6379, db=0"""
        self.__db= redis.Redis(**redis_kwargs)
#       self.key = '%s:%s' %(namespace, name)  

    def qsize(self,key):
        """Return the approximate size of the queue."""
        return self.__db.llen(key)

    def empty(self):
        """Return True if the queue is empty, False otherwise."""
        return self.qsize() == 0  

    def put(self,key,item):
        """Put item into the queue."""
        self.__db.rpush(key, item)  

    def get(self,key,block=True, timeout=None):
        """Remove and return an item from the queue.  

        If optional args block is true and timeout is None (the default), block
        if necessary until an item is available."""
        if block:
            item = self.__db.blpop(key, timeout=timeout)
        else:
            item = self.__db.lpop(key)  

        if item:
            item = item[1]
        return item  

    def get_nowait(self):
        """Equivalent to get(False)."""
        return self.get(False) 

def connect2redis(host='127.0.0.1'):
    global redis_q
    redis_q = RedisQueue()

if __name__ == '__main__':
    redis_q = RedisQueue(host='127.0.0.1')
    i = 0
    while True:
                redis_q.put('test2','hello world')
        print redis_q.get('test2')

Python QQ纯真库解析

QQWry.Dat不是普通的txt文件,所以python解析要特定的程序。下面是一个Python类

#!/usr/bin/python
# -*- coding: UTF-8 -*-

# author : firefoxbug
# E-Mail : wanghuafire@gmail.com
# Blog   : www.firefoxbug.net

import sys
import socket
from struct import pack, unpack

class IPInfo(object):
    '''QQWry.Dat数据库查询功能集合
    '''
    def __init__(self, dbname):

        self.dbname = dbname
        f = file(dbname, 'r')
        self.img = f.read()
        f.close()

        (self.firstIndex, self.lastIndex) = unpack('II', self.img[:8])
        self.indexCount = (self.lastIndex - self.firstIndex) / 7 + 1

    def getString(self, offset = 0):
        o2 = self.img.find('\0', offset)
        gb2312_str = self.img[offset:o2]
        try:
            utf8_str = unicode(gb2312_str,'gb2312').encode('utf-8')
        except:
            return '未知'
        return utf8_str

    def getLong3(self, offset = 0):
        s = self.img[offset: offset + 3]
        s += '\0'
        return unpack('I', s)[0]

    def getAreaAddr(self, offset = 0):
        ''' 通过给出偏移值,取得区域信息字符串,'''

        byte = ord(self.img[offset])
        if byte == 1 or byte == 2:
            p = self.getLong3(offset + 1)
            return self.getAreaAddr(p)
        else:
            return self.getString(offset)

    def getAddr(self, offset, ip = 0):
        img = self.img
        o = offset
        byte = ord(img[o])

        if byte == 1:
            return self.getAddr(self.getLong3(o + 1))

        if byte == 2:
            cArea = self.getAreaAddr(self.getLong3(o + 1))
            o += 4
            aArea = self.getAreaAddr(o)
            return (cArea, aArea)

        if byte != 1 and byte != 2:

            cArea = self.getString(o)
            o = self.img.find('\0',o) + 1
            aArea = self.getString(o)
            return (cArea, aArea)

    def find(self, ip, l, r):
        ''' 使用二分法查找网络字节编码的IP地址的索引记录'''
        if r - l <= 1:
            return l

        m = (l + r) / 2
        o = self.firstIndex + m * 7
        new_ip = unpack('I', self.img[o: o+4])[0]
        if ip <= new_ip:
            return self.find(ip, l, m)
        else:
            return self.find(ip, m, r)

    def getIPAddr(self, ip):
        ip = unpack('!I', socket.inet_aton(ip))[0]
        i = self.find(ip, 0, self.indexCount - 1)
        o = self.firstIndex + i * 7

        o2 = self.getLong3(o + 4)

        (c, a) = self.getAddr(o2 + 4)
        return (c, a)

    def output(self, first, last):
        for i in range(first, last):
            o = self.firstIndex +  i * 7
            ip = socket.inet_ntoa(pack('!I', unpack('I', self.img[o:o+4])[0]))
            offset = self.getLong3(o + 4)
            (c, a) = self.getAddr(offset + 4)
            print "%s %d %s/%s" % (ip, offset, c, a)

'''search ip and its'localtion '''
def search_ip(ip_instance,ipaddr):
    if ipaddr :
        try :
            (c, a) = ip_instance.getIPAddr(ipaddr)
            print '%s/%s'%(c,a)
        except :
            print 'Unknow/Unknow'

if __name__ == '__main__':
    ip_instance = IPInfo('./QQWry.Dat')
    search_ip(ip_instance,"8.8.8.8")

纯真库优势就是可以精确到公司,街道,网吧,但是有麻烦的一点就是统计起来不方便,因为都是一个字符串,需要再做分词。

Python读取fifo管道

Shell读取fifo的管道是很方便的,每一次read的时候都可以阻塞,只要管道两边都对接好。但Python在读取管道的时候我就一直没有找到好的解决方案,也是想每次读取fifo里面的buffer,没有buffer就阻塞着。下面一段代码可以实现这个功能,但是还是感觉很笨。

<pre>
pipe = "test.fifo"
...
read_cmd = "read -r a < " + pipe +";echo \"$a\""
lines = os.popen(read_cmd).read().split("\n")
line_str = lines[0]
</pre>

Python Mysql 连接断开错误代码2006

我本地用Python连接Mysql,可能是由于写得太多,导致Mysql出现2006错误,出现MySQL server has gone away,应该就是Mysql断开的连接。后来发现只要加Mysql的ping功能就行。

class MySQLHelper:
	def __init__(self,host,user,password,db_name,charset="utf8"):
		self.host = host
		self.user = user
		self.password = password
		self.db_name = db_name
		self.charset = charset
		self.conn = MySQLdb.connect(host=self.host,user=self.user,passwd=self.password,db=self.db_name, charset=self.charset,cursorclass=MySQLdb.cursors.DictCursor)
                self.conn.ping(True)
	def insert_sql_cmd(self,sql_cmd):
		self.cur = self.conn.cursor()
		try :
			self.cur.execute(sql_cmd)
			self.conn.commit()
			self.cur.close()
			return True
		except Exception, e:
			print "[MYSQL ERROR] : %s"%sql_cmd
			print "%s"%(str(e))
			self.cur.close()
			return False

	def update_sql_cmd(self,sql_cmd):
		self.cur = self.conn.cursor()
		try :
			self.cur.execute(sql_cmd)
			self.conn.commit()
			self.cur.close()
			return True
		except Exception, e:
			print "[MYSQL ERROR] : %s"%sql_cmd
			print "%s"%(str(e))
			self.cur.close()
			return False

	def query_sql_cmd(self,sql_cmd):
		try :
			self.cur = self.conn.cursor()
			self.cur.execute(sql_cmd)
			res = self.cur.fetchall()
			self.cur.close()
			return res
		except Exception, e:
			print "[MYSQL ERROR] : %s"%sql_cmd
			print "%s"%(str(e))
			self.cur.close()
			return False

	def close(self):
		self.conn.close()

Linux下I/O模型介绍

阻塞I/O


阻塞 I/O 模型是最原始的模型,进程一旦执行某个函数调用,进程就进入休眠状态(Sleeping)。比如平时FIFO管道的 read,还有基于TCP的流socket的 read 调用,进程一旦进行系统函数的调用,会从用户态切入内核态,内核会进行系统调用 read ,这时候如果对应的流(管道,socket都算)还没准备写入数据,那么 read 函数就会阻塞,从而导致进程挂起。直到数据来了,内核才会把数据拷贝从内核的缓冲区到进程用户的缓冲区。这时候 read 函数才能返回,进程才能向下走,继续下面的处理。整个过程是串行的,必须一步一步来。

- 阅读剩余部分 -