标签 Python 下的文章

Python中staticmethod方法和classmethod方法区别

前言

staticmethod和classmethod两个方法在python里是通过装饰器来实现的,语法分别是@staticmethod和@classmethod,本文就讨论下这两种方法的区别以及使用场景

定义方式差异

@classmethod和@staticmethod装饰方法时,对于被装饰方法本身定义有差异,主要体现在形参上。

@classmethod
#第一个参数是类本身
def class_method(cls, data):

@staticmethod
#不存在任何与类、实例相关的参数,包括cls、self
def static_method(data):

- 阅读剩余部分 -

Python下unicode和string编码

问题

#!/usr/bin/python
# -*- coding: utf-8 -*-

s = "你好"
s.encode("utf-8")

运行结果报错

UnicodeDecodeError: 'ascii' codec can't decode byte 0xe4 in position 0: ordinal not in range(128)

报错原因

在python里面,编码和解码的关系如下:

unicode -> string 编码
string -> unicode 解码

上面的s变量其实是string类型,如果尝试对s进行encode,那么必须对s变量进行解码成unicode,然后再编码成string。但是在s变量解码的过程中,python会根据系统默认的解码方式进行解码,根据报错可以看出是python默认的解码方式是"ascii",sys.getdefaultencoding()可以查看。但是因为文件开头指定了s变量又是utf-8的编码方式,所以就冲突了,做法就是在编码之前指定"utf-8"进行解码。

如何判断某个字符串的编码?

isinstance(s, str) 用来判断是否为一般字符串

isinstance(s, unicode) 用来判断是否为unicode
#!/usr/bin/python
# -*- coding: utf-8 -*-

string = "你好"
string.decode('utf-8').encode("utf-8")

还有一种方式就是修改python默认编码解码方式

import sys
reload(sys)
sys.setdefaultencoding('utf-8')

Python Gevent应用

Gevent介绍

看了下gevent,最好的资料还是gevent程序员指南,亲自用了下gevent,因为是第一次真正接触coroutine,所以还是有点抽象,碰到了一些问题。

关于coroutine

淺談coroutine與gevent

gevent应用

场景:有100个url,要对这100个url并发发起请求,通过个url丢进去,然后执行完成后,100个url请求的结果独立返回给我,如果是true那么就是返回请求的结果,如果是false,那么返回错误的原因。

- 阅读剩余部分 -

Python常用代码笔记

参数解析

import sys
import getopt

try:
    opts,args = getopt.getopt(sys.argv[1:], "ho:",["help", "output="]);
    if not opts:
        usage()
    for opt,arg in opts:
        if opt in ("-h", "--help"):
            usage();
        elif opt in ("-H",):
            mysql_host = arg
        elif opt in ("-o",):
            parse_cpu_csv(arg)
        else:
            usage()
            sys.exit(1)
except getopt.GetoptError:
    usage();
    sys.exit(1)

- 阅读剩余部分 -

python中的@装饰器

先看一段代码

#!/usr/bin/python

def test(func):
    func()

@test
def fun():
    print "call fun"

上面的代码没有main()调用或者直接的函数调用,结果还是会输出

call fun

@修饰符有点像函数指针,python解释器发现执行的时候如果碰到@修饰的函数,首先就解析它,找到它对应的函数进行调用,并且会把@修饰下面一行的函数作为一个函数指针传入它对应的函数。有点绕口,这里说的“它对应的函数”就是名字是一样的。下面说下之前代码的解析流程

  1. python解释器发现@test,就去调用test函数
  2. test函数调用预先要指定一个参数,传入的就是@test下面修饰的函数,也就是fun()
  3. test()函数执行,调用fun(),fun()打印“call fun”

再看一段代码

#!/usr/bin/python

def test(func):
    func()
    print "call test over"

def main():
    @test
    def fun():
        print "call fun"
#main()

这样调用的话就不会调用test,只有当main函数调用的时候才会进入到main函数,然后调用test。
来看看正规的用法


def t(func):
    print "in t"
    def _a():
        print "in a"
        return func()
    print "call t over"
    return _a

@t
def hello():
    print "hello"

hello()

输出

in t
call t over
in a
hello

运行流程

  1. 执行hello(),发现有@修饰符,把hello()作为函数指针传入@修饰的函数t()
  2. t()函数执行到return _a,这时候会去调用_a()
  3. _a()函数执行func(),这里的func()就是之前传入了hello()
  4. 执行hello()函数的打印

应用场景:测试函数运行时间

from time import clock

def get_run_time(func):
    def _a():
        start = clock()
        func()
        end = clock()
        print end-start
    return _a

@get_run_time
def hello():
    print "hello\n"

hello()

注意一定要写成这样闭包的形式

### 如何传参数

def t(func):
    print "in t"
    def _a(* args, ** kwds):
        print "in a"
        return func(* args, ** kwds)
    print "call t over"
    return _a

@t
def hello(v):
    print "hello", v

hello("world")

更多用法

[http://www.cnblogs.com/rhcad/archive/2011/12/21/2295507.html](http://www.cnblogs.com/rhcad/archive/2011/12/21/2295507.html)

python解析配置文件

项目中经常需要解析配置文件,最简单的就是有很多个section,然后每个section里面都是很多option,每一项option的组成都是key=value,怎么解析这个配置文件?python有内置的库可以解决,比如下面是一个配置文件的demo

#WatchDir 配置文件
#配置的value的时候除了第一项就先别加单引号或者双引号了

[monitor]
#监控路径
WatchPath='/var/log/'

#数据写到本地磁盘cache时间间隔,单位:S
Interval=10


#是否以守护进程方式运行
Daemon=False

#mysql配置
[mysql]
mysql_host = 127.0.0.1
mysql_username = root
mysql_password = 12345678
mysql_port = 3306
mysql_dbname = db_a
mysql_table = tb_b

- 阅读剩余部分 -

Python redis操作

首先安装python redis扩展模块 yum install python-redis

#!/usr/bin/env python2.6
#-*- coding: utf-8 -*-

# Author : firefoxbug
# E-Mail : wanghuafire@gmail.com
# Blog   : www.firefoxbug.net
# Function : put or get log dictionary from redis-server

import redis  

global redis_q

class RedisQueue(object):
    """Simple Queue with Redis Backend"""
    def __init__(self,**redis_kwargs):
        """The default connection parameters are: host='localhost', port=6379, db=0"""
        self.__db= redis.Redis(**redis_kwargs)
#       self.key = '%s:%s' %(namespace, name)  

    def qsize(self,key):
        """Return the approximate size of the queue."""
        return self.__db.llen(key)

    def empty(self):
        """Return True if the queue is empty, False otherwise."""
        return self.qsize() == 0  

    def put(self,key,item):
        """Put item into the queue."""
        self.__db.rpush(key, item)  

    def get(self,key,block=True, timeout=None):
        """Remove and return an item from the queue.  

        If optional args block is true and timeout is None (the default), block
        if necessary until an item is available."""
        if block:
            item = self.__db.blpop(key, timeout=timeout)
        else:
            item = self.__db.lpop(key)  

        if item:
            item = item[1]
        return item  

    def get_nowait(self):
        """Equivalent to get(False)."""
        return self.get(False) 

def connect2redis(host='127.0.0.1'):
    global redis_q
    redis_q = RedisQueue()

if __name__ == '__main__':
    redis_q = RedisQueue(host='127.0.0.1')
    i = 0
    while True:
                redis_q.put('test2','hello world')
        print redis_q.get('test2')

Python QQ纯真库解析

QQWry.Dat不是普通的txt文件,所以python解析要特定的程序。下面是一个Python类

#!/usr/bin/python
# -*- coding: UTF-8 -*-

# author : firefoxbug
# E-Mail : wanghuafire@gmail.com
# Blog   : www.firefoxbug.net

import sys
import socket
from struct import pack, unpack

class IPInfo(object):
    '''QQWry.Dat数据库查询功能集合
    '''
    def __init__(self, dbname):

        self.dbname = dbname
        f = file(dbname, 'r')
        self.img = f.read()
        f.close()

        (self.firstIndex, self.lastIndex) = unpack('II', self.img[:8])
        self.indexCount = (self.lastIndex - self.firstIndex) / 7 + 1

    def getString(self, offset = 0):
        o2 = self.img.find('\0', offset)
        gb2312_str = self.img[offset:o2]
        try:
            utf8_str = unicode(gb2312_str,'gb2312').encode('utf-8')
        except:
            return '未知'
        return utf8_str

    def getLong3(self, offset = 0):
        s = self.img[offset: offset + 3]
        s += '\0'
        return unpack('I', s)[0]

    def getAreaAddr(self, offset = 0):
        ''' 通过给出偏移值,取得区域信息字符串,'''

        byte = ord(self.img[offset])
        if byte == 1 or byte == 2:
            p = self.getLong3(offset + 1)
            return self.getAreaAddr(p)
        else:
            return self.getString(offset)

    def getAddr(self, offset, ip = 0):
        img = self.img
        o = offset
        byte = ord(img[o])

        if byte == 1:
            return self.getAddr(self.getLong3(o + 1))

        if byte == 2:
            cArea = self.getAreaAddr(self.getLong3(o + 1))
            o += 4
            aArea = self.getAreaAddr(o)
            return (cArea, aArea)

        if byte != 1 and byte != 2:

            cArea = self.getString(o)
            o = self.img.find('\0',o) + 1
            aArea = self.getString(o)
            return (cArea, aArea)

    def find(self, ip, l, r):
        ''' 使用二分法查找网络字节编码的IP地址的索引记录'''
        if r - l <= 1:
            return l

        m = (l + r) / 2
        o = self.firstIndex + m * 7
        new_ip = unpack('I', self.img[o: o+4])[0]
        if ip <= new_ip:
            return self.find(ip, l, m)
        else:
            return self.find(ip, m, r)

    def getIPAddr(self, ip):
        ip = unpack('!I', socket.inet_aton(ip))[0]
        i = self.find(ip, 0, self.indexCount - 1)
        o = self.firstIndex + i * 7

        o2 = self.getLong3(o + 4)

        (c, a) = self.getAddr(o2 + 4)
        return (c, a)

    def output(self, first, last):
        for i in range(first, last):
            o = self.firstIndex +  i * 7
            ip = socket.inet_ntoa(pack('!I', unpack('I', self.img[o:o+4])[0]))
            offset = self.getLong3(o + 4)
            (c, a) = self.getAddr(offset + 4)
            print "%s %d %s/%s" % (ip, offset, c, a)

'''search ip and its'localtion '''
def search_ip(ip_instance,ipaddr):
    if ipaddr :
        try :
            (c, a) = ip_instance.getIPAddr(ipaddr)
            print '%s/%s'%(c,a)
        except :
            print 'Unknow/Unknow'

if __name__ == '__main__':
    ip_instance = IPInfo('./QQWry.Dat')
    search_ip(ip_instance,"8.8.8.8")

纯真库优势就是可以精确到公司,街道,网吧,但是有麻烦的一点就是统计起来不方便,因为都是一个字符串,需要再做分词。

Linux下I/O模型介绍

阻塞I/O


阻塞 I/O 模型是最原始的模型,进程一旦执行某个函数调用,进程就进入休眠状态(Sleeping)。比如平时FIFO管道的 read,还有基于TCP的流socket的 read 调用,进程一旦进行系统函数的调用,会从用户态切入内核态,内核会进行系统调用 read ,这时候如果对应的流(管道,socket都算)还没准备写入数据,那么 read 函数就会阻塞,从而导致进程挂起。直到数据来了,内核才会把数据拷贝从内核的缓冲区到进程用户的缓冲区。这时候 read 函数才能返回,进程才能向下走,继续下面的处理。整个过程是串行的,必须一步一步来。

- 阅读剩余部分 -

Python socket tcp

Server

#!/usr/bin/python

'''
	tcp socket server
'''

import sys
import socket

HOST = ''  
PORT = 9243
DATA_BUFFER = 4096

# create a TCP socket
try :
	s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
	print 'Socket created'
except socket.error, msg :
	print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
	sys.exit()

# Bind socket to local host and port
try:
	s.bind((HOST, PORT))
except socket.error , msg:
	print 'Bind failed. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
	sys.exit()

s.listen(5)			# allow 5 simultaneous

while True:
	# wait for next client to connect
	connection, address = s.accept()		# connection is a new socket
	while True:
		data = connection.recv(DATA_BUFFER) # receive up to 1K bytes
		if data:
			print data
		else:
			break
	connection.close()						# close socket

Client


#!/usr/bin/python

'''
	tcp socket client
'''

import socket  
  
address = ('223.4.238.138', 9243) 
# create a TCP socket
try :
	s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
	print 'Socket created'
except socket.error, msg :
	print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
	sys.exit()	

# create a TCP socket
try :
	s.connect(address)
	print 'Connect successfully'
except socket.error, msg :
	print 'Failed to Connect . Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
	sys.exit()

s.send('hello world')  

s.close()  

Python open模式

open(file_name,mode)

w 以写方式打开,
a 以追加模式打开 (从 EOF 开始, 必要时创建新文件)
r+ 以读写模式打开
w+ 以读写模式打开 (参见 w )
a+ 以读写模式打开 (参见 a )
rb 以二进制读模式打开
wb 以二进制写模式打开 (参见 w )
ab 以二进制追加模式打开 (参见 a )
rb+ 以二进制读写模式打开 (参见 r+ )
wb+ 以二进制读写模式打开 (参见 w+ )
ab+ 以二进制读写模式打开 (参见 a+ )

注意:

1、使用'w',文件若存在,首先要清空,然后(重新)创建,
2、使用'a'模式 ,把所有要写入文件的数据都追加到文件的末尾,即使你使用了seek()指向文件的其他地方,如果文件不存在,将自动被创建。

Python 发送HEAD包

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import urllib2
import socket

def send(url):
	request = urllib2.Request(url,)
	try:
		request.get_method = lambda: 'HEAD'
		response = urllib2.urlopen(request)
		msg = response.msg
		print "message : %s"%msg
		headers = response.headers
		print "\n\n%s"%headers
		data = response.read()
		print "data\n%s"%data
	except urllib2.HTTPError,e:
		code = e.code
		print code
	except Exception,e:
		print e

socket.setdefaulttimeout(2)
send("http://74.91.23.207/")

Head方法要求响应与GET请求一样,但是没有响应体(response body)。如果我们只对关于网页或资源的
信息感兴趣,而不想检索资源本身的全部内容,可以使用HEAD命令。HEAD的使用方法与GET相同,只是不返
回Web页的正文内容。

Python Mysql类

Python支持Mysql的库

# yum -y install  MySQL-python

import MySQLdb

mysql_instance = "";
class MySQLHelper:
	def __init__(self,host,user,password,db_name,charset="utf8"):
		self.host = host
		self.user = user
		self.password = password
		self.db_name = db_name
		self.charset = charset
		try:
			self.conn = MySQLdb.connect(host=self.host,user=self.user,passwd=self.password,db=self.db_name, charset=self.charset)
		except:
			print("Mysql Connect Error ")
			sys.exit(0)

	def update_sql_cmd(self,sql_cmd):
		self.cur = self.conn.cursor()
		try :
			self.cur.execute(sql_cmd)
			self.conn.commit()
			self.cur.close()
			return True
		except Exception, e:
			print "[MYSQL ERROR] : %s"%sql_cmd
			self.cur.close()
			return False

	def run_sql_cmd(self,sql_cmd):
		try :
			self.cur = self.conn.cursor()
			self.cur.execute(sql_cmd)
			res = self.cur.fetchall()
			self.cur.close()
			return res
		except Exception, e:
			print "[MYSQL ERROR] : %s"%sql_cmd
			self.cur.close()
			return False

	def close(self):
		self.conn.close()

def mysql_init():
	'''connect to mysql database opencdn'''
	try:
		global mysql_instance
		mysql_instance = MySQLHelper(mysql_host,mysql_username,mysql_password,mysql_dbname)
	except Exception, e:
		print("Mysql Error %d: %s" % (e.args[0], e.args[1]))
		sys.exit(1)

update_sql_cmd()可以对mysql进行插入操作
run_sql_cmd()可以对mysql查询操作并且返回二元组

python进程池

python自2.6开始提供了多进程模块multiprocessing,进程池使用multiprocessing.pool,pool的构造如下:

```
multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])
```

* processes: 表示pool中进程的数目,默认地为当前CPU的核数。
* initializer: 表示新进程的初始化函数。
* initargs: 表示新进程的初始化函数的参数。
* maxtasksperchild: 表示每个进程执行task的最大数目

```
apply_async(func[, args[, kwds[, callback]]])
```

主进程循环运行过程中不等待apply_async的返回结果,在主进程结束后,即使子进程还未返回整个程序也会退出。虽然 apply_async是非阻塞的,但其返回结果的get方法却是阻塞的,如使用result.get()会阻塞主进程。
如果我们对返回结果不感兴趣, 那么可以在主进程中使用pool.close与pool.join来防止主进程退出。注意join方法一定要在close或terminate之后调用。

```
#!/usr/bin/python
# -*- coding: UTF-8 -*-

import os
import multiprocessing
import time

def func(msg):
print msg,os.getpid()
time.sleep(1)
return "done" + msg

if __name__ == "__main__":
pool = multiprocessing.Pool(processes=12)
result = []
for i in xrange(10):
msg = "hello %d" %(i)
result.append(pool.apply_async(func, (msg, )))
pool.close()
pool.join()
print "wait"
for res in result:
print res.get()
print "Sub-process done.";
```

python HTTP请求包

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import urllib2
import socket

def send(url,host,body=""):
	socket.setdefaulttimeout(2)
	request = urllib2.Request(url,body)
	request.add_header('User-Agent', 'Firefox')
	request.add_header('Cache-Control', 'no-cache')
	request.add_header('Accept-Charset', 'ISO-8859-1,utf-8;q=0.7,*;q=0.7')
	request.add_header('Referer', "http://www.firefoxbug.com/")
	request.add_header('Keep-Alive',60)
	request.add_header('Connection', 'keep-alive')
	request.add_header('Host',host)
	try:
		response = urllib2.urlopen(request)
		code = response.code
		msg = response.msg
		print "status code %d"%code
		print "message : %s"%msg

		headers = response.headers
		print "\n\n%s"%headers

		data = response.read()
		print "data\n%s"%data
	except Exception,e:
		raise e

send("http://223.4.238.138","www.firefoxbug.net")

是否发送Post包可以通过Body字段是否为空来指定。

socket.setdefaulttimeout()来指定请求超时时间。

base64编码

By firefoxbug

shell base64编码


[root@firefoxbug] str="hello world"

[root@firefoxbug] echo -n "$str" | base64

aGVsbG8gd29ybGQ=

[root@firefoxbug] echo "$str" | base64 注意要加 -n ,不然会有换行,编码就有问题了。

aGVsbG8gd29ybGQK

shell base64解码


-d :decode

[root@firefoxbug]  echo -n "$str" | base64 | base64 -d

hello world

python base64编码


[code]
#!/usr/bin/python
#-*-coding:utf-8-*-

import base64
sour_str = "hello world"
print base64.encodestring(sour_str)
[/code]

python base64解码


[code]
#!/usr/bin/python
#-*-coding:utf-8-*-

import base64
sour_str = "hello world"
decode_str = base64.encodestring(sour_str)
print base64.decodestring(decode_str)
[/code]