Python Tornado框架教程
Tornado 是一个 Python 的异步网络框架和 Web 服务器,最初由 FriendFeed 开发,后被 Facebook 收购并开源。它以高性能和非阻塞 I/O 著称,特别适合处理长轮询、WebSocket 和其他需要持久连接的应用。
目录
- 安装与快速开始
- 基本路由与请求处理
- 模板系统
- 异步处理
- WebSocket
- 数据库集成
- 用户认证
- 部署与优化
- 测试
- 实际应用示例
安装与快速开始
安装 Tornado
pip install tornado
最小应用
import tornado.ioloop
import tornado.web
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, Tornado!")
def make_app():
return tornado.web.Application([
(r"/", MainHandler),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
运行后访问 http://localhost:8888
即可看到 "Hello, Tornado!"。
基本路由与请求处理
路由配置
class HomeHandler(tornado.web.RequestHandler):
def get(self):
self.write("Home Page")
class AboutHandler(tornado.web.RequestHandler):
def get(self):
self.write("About Page")
def make_app():
return tornado.web.Application([
(r"/", HomeHandler),
(r"/about", AboutHandler),
])
HTTP 方法处理
class ResourceHandler(tornado.web.RequestHandler):
def get(self, resource_id):
self.write(f"GET resource {resource_id}")
def post(self, resource_id):
self.write(f"POST to resource {resource_id}")
def put(self, resource_id):
self.write(f"PUT resource {resource_id}")
def delete(self, resource_id):
self.write(f"DELETE resource {resource_id}")
app = tornado.web.Application([
(r"/resource/([0-9]+)", ResourceHandler),
])
请求参数获取
class ParamsHandler(tornado.web.RequestHandler):
def get(self):
# 获取查询参数
name = self.get_query_argument("name", default="Guest")
# 获取多个同名查询参数
colors = self.get_query_arguments("color")
self.write(f"Hello {name}, colors: {', '.join(colors)}")
def post(self):
# 获取表单参数
username = self.get_body_argument("username")
# 获取多个同名表单参数
interests = self.get_body_arguments("interest")
# 获取JSON数据
try:
data = tornado.escape.json_decode(self.request.body)
except ValueError:
data = {}
self.write(f"Username: {username}, Interests: {interests}, JSON: {data}")
响应处理
class ResponseHandler(tornado.web.RequestHandler):
def get(self):
# 设置状态码
self.set_status(200)
# 设置headers
self.set_header("Content-Type", "text/plain")
self.set_header("X-Custom-Header", "Value")
# 设置cookie
self.set_cookie("session_id", "12345", expires_days=1)
# 重定向
# self.redirect("/new-location")
# 写入响应体
self.write("This is the response body")
# 结束响应
self.finish()
模板系统
基本模板使用
创建模板文件 templates/index.html
:
<!DOCTYPE html>
<html>
<head>
<title>{{ title }}</title>
</head>
<body>
<h1>{{ header }}</h1>
<ul>
{% for item in items %}
<li>{{ escape(item) }}</li>
{% end %}
</ul>
</body>
</html>
运行 HTML
在Handler中使用模板:
class TemplateHandler(tornado.web.RequestHandler):
def get(self):
items = ["Item 1", "Item 2", "Item 3"]
self.render("index.html",
title="My Page",
header="Welcome!",
items=items)
模板继承
基础模板 templates/base.html
:
<!DOCTYPE html>
<html>
<head>
<title>{% block title %}Default Title{% end %}</title>
</head>
<body>
{% block content %}{% end %}
</body>
</html>
运行 HTML
子模板 templates/page.html
:
{% extends "base.html" %}
{% block title %}Page Title{% end %}
{% block content %}
<h1>Page Content</h1>
<p>This is the page content.</p>
{% end %}
运行 HTML
异步处理
协程处理
from tornado import gen
class AsyncHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
# 模拟异步操作
result = yield self.do_something_async()
self.write(f"Result: {result}")
@gen.coroutine
def do_something_async(self):
# 模拟耗时操作
yield gen.sleep(1)
raise gen.Return("Done")
原生 async/await (Python 3.5+)
class NativeAsyncHandler(tornado.web.RequestHandler):
async def get(self):
result = await self.do_something_async()
self.write(f"Result: {result}")
async def do_something_async(self):
await gen.sleep(1)
return "Done"
并行请求
class ParallelHandler(tornado.web.RequestHandler):
async def get(self):
http_client = AsyncHTTPClient()
# 并行执行多个请求
response1, response2 = await gen.multi([
http_client.fetch("http://example.com"),
http_client.fetch("http://example.org")
])
self.write(f"Got {len(response1.body)} from example.com and "
f"{len(response2.body)} from example.org")
WebSocket
基本WebSocket实现
class ChatWebSocket(tornado.websocket.WebSocketHandler):
clients = set()
def open(self):
ChatWebSocket.clients.add(self)
print("WebSocket opened")
def on_message(self, message):
for client in ChatWebSocket.clients:
if client != self:
client.write_message(f"Someone said: {message}")
def on_close(self):
ChatWebSocket.clients.remove(self)
print("WebSocket closed")
app = tornado.web.Application([
(r"/chat", ChatWebSocket),
])
WebSocket安全
class AuthWebSocket(tornado.websocket.WebSocketHandler):
def check_origin(self, origin):
# 检查来源是否允许
allowed_origins = ["http://example.com", "http://localhost"]
parsed_origin = urllib.parse.urlparse(origin)
return parsed_origin.netloc in allowed_origins
def open(self):
# 验证用户
if not self.get_cookie("user"):
self.close()
数据库集成
使用 asyncpg (PostgreSQL)
import asyncpg
class DBHandler(tornado.web.RequestHandler):
async def prepare(self):
self.db = await asyncpg.connect(
user='user',
password='password',
database='dbname',
host='localhost'
)
async def on_finish(self):
await self.db.close()
async def get(self):
users = await self.db.fetch("SELECT * FROM users")
self.write({"users": users})
使用 Tortoise-ORM
from tortoise.contrib.tornado import register_tortoise
app = tornado.web.Application([
# 你的路由
])
register_tortoise(
app,
db_url="postgres://user:password@localhost/dbname",
modules={"models": ["models"]},
generate_schemas=True
)
# models.py
from tortoise.models import Model
from tortoise import fields
class User(Model):
id = fields.IntField(pk=True)
name = fields.CharField(50)
email = fields.CharField(100)
用户认证
基本认证系统
class BaseHandler(tornado.web.RequestHandler):
def get_current_user(self):
return self.get_secure_cookie("user")
class LoginHandler(BaseHandler):
def get(self):
self.render("login.html")
async def post(self):
username = self.get_body_argument("username")
password = self.get_body_argument("password")
# 验证用户 (示例)
if username == "admin" and password == "password":
self.set_secure_cookie("user", username)
self.redirect("/")
else:
self.redirect("/login")
class ProtectedHandler(BaseHandler):
@tornado.web.authenticated
def get(self):
self.write(f"Welcome, {self.current_user.decode()}")
app = tornado.web.Application([
(r"/", ProtectedHandler),
(r"/login", LoginHandler),
], cookie_secret="YOUR_SECRET_KEY_HERE")
使用第三方认证
from tornado.auth import GoogleOAuth2Mixin
class GoogleLoginHandler(tornado.web.RequestHandler, GoogleOAuth2Mixin):
async def get(self):
if self.get_argument("code", False):
user = await self.get_authenticated_user(
redirect_uri="http://your.site.com/auth/google",
code=self.get_argument("code")
)
# 保存用户信息
self.set_secure_cookie("user", user["email"])
self.redirect("/")
else:
await self.authorize_redirect(
redirect_uri="http://your.site.com/auth/google",
client_id=self.settings["google_oauth"]["key"],
scope=["profile", "email"],
response_type="code",
extra_params={"approval_prompt": "auto"}
)
部署与优化
多进程模式
if __name__ == "__main__":
app = make_app()
server = tornado.httpserver.HTTPServer(app)
server.bind(8888)
# 启动多个进程 (通常等于CPU核心数)
server.start(0) # 0 表示根据CPU核心数自动决定
tornado.ioloop.IOLoop.current().start()
使用 Nginx 反向代理
Nginx 配置示例:
nginx
server {
listen 80;
server_name example.com;
location / {
proxy_pass http://127.0.0.1:8888;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
location /static/ {
alias /path/to/your/static/files/;
expires 30d;
}
}
静态文件处理
app = tornado.web.Application([
# 你的路由
],
static_path=os.path.join(os.path.dirname(__file__), "static"),
static_url_prefix="/static/"
)
测试
单元测试
from tornado.testing import AsyncHTTPTestCase
class TestApp(AsyncHTTPTestCase):
def get_app(self):
return make_app()
def test_homepage(self):
response = self.fetch("/")
self.assertEqual(response.code, 200)
self.assertIn(b"Hello", response.body)
def test_post(self):
response = self.fetch("/submit", method="POST", body="name=test")
self.assertEqual(response.code, 200)
运行测试:
python -m unittest test_module
实际应用示例
RESTful API
import json
from tornado.web import RequestHandler
from tornado.escape import json_decode
class UserHandler(RequestHandler):
def set_default_headers(self):
self.set_header("Content-Type", "application/json")
async def get(self, user_id=None):
if user_id:
user = await self.get_user(user_id)
if user:
self.write(json.dumps(user))
else:
self.set_status(404)
self.write(json.dumps({"error": "User not found"}))
else:
users = await self.get_all_users()
self.write(json.dumps(users))
async def post(self):
try:
data = json_decode(self.request.body)
user_id = await self.create_user(data)
self.set_status(201)
self.write(json.dumps({"id": user_id}))
except Exception as e:
self.set_status(400)
self.write(json.dumps({"error": str(e)}))
# 实现其他HTTP方法...
实时聊天应用
class ChatHandler(tornado.websocket.WebSocketHandler):
waiters = set()
cache = []
cache_size = 200
def get_compression_options(self):
# 开启压缩
return {}
def open(self):
ChatHandler.waiters.add(self)
def on_close(self):
ChatHandler.waiters.remove(self)
@classmethod
def update_cache(cls, chat):
cls.cache.append(chat)
if len(cls.cache) > cls.cache_size:
cls.cache = cls.cache[-cls.cache_size:]
@classmethod
def send_updates(cls, chat):
for waiter in cls.waiters:
try:
waiter.write_message(chat)
except:
logging.error("Error sending message", exc_info=True)
def on_message(self, message):
parsed = tornado.escape.json_decode(message)
chat = {
"id": str(uuid.uuid4()),
"body": parsed["body"],
"type": "chat",
"user": parsed["user"],
"timestamp": time.time()
}
chat["html"] = tornado.escape.to_basestring(
self.render_string("message.html", message=chat))
ChatHandler.update_cache(chat)
ChatHandler.send_updates(chat)
总结
Tornado 是一个强大的异步 Web 框架,特别适合需要高性能和实时通信的应用。它的主要特点包括:
- 非阻塞 I/O 和事件驱动架构
- 内置 WebSocket 支持
- 高性能 HTTP 服务器
- 强大的异步处理能力
- 内置模板引擎
- 支持多种认证方式
通过本教程,您应该已经掌握了 Tornado 的核心功能。要了解更多,请参考 Tornado 官方文档。