Python Tornado框架教程

Tornado 是一个 Python 的异步网络框架和 Web 服务器,最初由 FriendFeed 开发,后被 Facebook 收购并开源。它以高性能和非阻塞 I/O 著称,特别适合处理长轮询、WebSocket 和其他需要持久连接的应用。

目录

  1. 安装与快速开始
  2. 基本路由与请求处理
  3. 模板系统
  4. 异步处理
  5. WebSocket
  6. 数据库集成
  7. 用户认证
  8. 部署与优化
  9. 测试
  10. 实际应用示例

安装与快速开始

安装 Tornado

pip install tornado

最小应用

import tornado.ioloop
import tornado.web

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, Tornado!")

def make_app():
    return tornado.web.Application([
        (r"/", MainHandler),
    ])

if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()

运行后访问 http://localhost:8888 即可看到 "Hello, Tornado!"。


基本路由与请求处理

路由配置

class HomeHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Home Page")

class AboutHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("About Page")

def make_app():
    return tornado.web.Application([
        (r"/", HomeHandler),
        (r"/about", AboutHandler),
    ])

HTTP 方法处理

class ResourceHandler(tornado.web.RequestHandler):
    def get(self, resource_id):
        self.write(f"GET resource {resource_id}")

    def post(self, resource_id):
        self.write(f"POST to resource {resource_id}")

    def put(self, resource_id):
        self.write(f"PUT resource {resource_id}")

    def delete(self, resource_id):
        self.write(f"DELETE resource {resource_id}")

app = tornado.web.Application([
    (r"/resource/([0-9]+)", ResourceHandler),
])

请求参数获取

class ParamsHandler(tornado.web.RequestHandler):
    def get(self):
        # 获取查询参数
        name = self.get_query_argument("name", default="Guest")

        # 获取多个同名查询参数
        colors = self.get_query_arguments("color")

        self.write(f"Hello {name}, colors: {', '.join(colors)}")

    def post(self):
        # 获取表单参数
        username = self.get_body_argument("username")

        # 获取多个同名表单参数
        interests = self.get_body_arguments("interest")

        # 获取JSON数据
        try:
            data = tornado.escape.json_decode(self.request.body)
        except ValueError:
            data = {}

        self.write(f"Username: {username}, Interests: {interests}, JSON: {data}")

响应处理

class ResponseHandler(tornado.web.RequestHandler):
    def get(self):
        # 设置状态码
        self.set_status(200)

        # 设置headers
        self.set_header("Content-Type", "text/plain")
        self.set_header("X-Custom-Header", "Value")

        # 设置cookie
        self.set_cookie("session_id", "12345", expires_days=1)

        # 重定向
        # self.redirect("/new-location")

        # 写入响应体
        self.write("This is the response body")

        # 结束响应
        self.finish()

模板系统

基本模板使用

创建模板文件 templates/index.html:

<!DOCTYPE html>
<html>
<head>
    <title>{{ title }}</title>
</head>
<body>
    <h1>{{ header }}</h1>
    <ul>
        {% for item in items %}
        <li>{{ escape(item) }}</li>
        {% end %}
    </ul>
</body>
</html>

运行 HTML

在Handler中使用模板:

class TemplateHandler(tornado.web.RequestHandler):
    def get(self):
        items = ["Item 1", "Item 2", "Item 3"]
        self.render("index.html", 
                   title="My Page", 
                   header="Welcome!", 
                   items=items)

模板继承

基础模板 templates/base.html:

<!DOCTYPE html>
<html>
<head>
    <title>{% block title %}Default Title{% end %}</title>
</head>
<body>
    {% block content %}{% end %}
</body>
</html>

运行 HTML

子模板 templates/page.html:

{% extends "base.html" %}

{% block title %}Page Title{% end %}

{% block content %}
    <h1>Page Content</h1>
    <p>This is the page content.</p>
{% end %}

运行 HTML


异步处理

协程处理

from tornado import gen

class AsyncHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        # 模拟异步操作
        result = yield self.do_something_async()
        self.write(f"Result: {result}")

    @gen.coroutine
    def do_something_async(self):
        # 模拟耗时操作
        yield gen.sleep(1)
        raise gen.Return("Done")

原生 async/await (Python 3.5+)

class NativeAsyncHandler(tornado.web.RequestHandler):
    async def get(self):
        result = await self.do_something_async()
        self.write(f"Result: {result}")

    async def do_something_async(self):
        await gen.sleep(1)
        return "Done"

并行请求

class ParallelHandler(tornado.web.RequestHandler):
    async def get(self):
        http_client = AsyncHTTPClient()

        # 并行执行多个请求
        response1, response2 = await gen.multi([
            http_client.fetch("http://example.com"),
            http_client.fetch("http://example.org")
        ])

        self.write(f"Got {len(response1.body)} from example.com and "
                  f"{len(response2.body)} from example.org")

WebSocket

基本WebSocket实现

class ChatWebSocket(tornado.websocket.WebSocketHandler):
    clients = set()

    def open(self):
        ChatWebSocket.clients.add(self)
        print("WebSocket opened")

    def on_message(self, message):
        for client in ChatWebSocket.clients:
            if client != self:
                client.write_message(f"Someone said: {message}")

    def on_close(self):
        ChatWebSocket.clients.remove(self)
        print("WebSocket closed")

app = tornado.web.Application([
    (r"/chat", ChatWebSocket),
])

WebSocket安全

class AuthWebSocket(tornado.websocket.WebSocketHandler):
    def check_origin(self, origin):
        # 检查来源是否允许
        allowed_origins = ["http://example.com", "http://localhost"]
        parsed_origin = urllib.parse.urlparse(origin)
        return parsed_origin.netloc in allowed_origins

    def open(self):
        # 验证用户
        if not self.get_cookie("user"):
            self.close()

数据库集成

使用 asyncpg (PostgreSQL)

import asyncpg

class DBHandler(tornado.web.RequestHandler):
    async def prepare(self):
        self.db = await asyncpg.connect(
            user='user', 
            password='password',
            database='dbname',
            host='localhost'
        )

    async def on_finish(self):
        await self.db.close()

    async def get(self):
        users = await self.db.fetch("SELECT * FROM users")
        self.write({"users": users})

使用 Tortoise-ORM

from tortoise.contrib.tornado import register_tortoise

app = tornado.web.Application([
    # 你的路由
])

register_tortoise(
    app,
    db_url="postgres://user:password@localhost/dbname",
    modules={"models": ["models"]},
    generate_schemas=True
)

# models.py
from tortoise.models import Model
from tortoise import fields

class User(Model):
    id = fields.IntField(pk=True)
    name = fields.CharField(50)
    email = fields.CharField(100)

用户认证

基本认证系统

class BaseHandler(tornado.web.RequestHandler):
    def get_current_user(self):
        return self.get_secure_cookie("user")

class LoginHandler(BaseHandler):
    def get(self):
        self.render("login.html")

    async def post(self):
        username = self.get_body_argument("username")
        password = self.get_body_argument("password")

        # 验证用户 (示例)
        if username == "admin" and password == "password":
            self.set_secure_cookie("user", username)
            self.redirect("/")
        else:
            self.redirect("/login")

class ProtectedHandler(BaseHandler):
    @tornado.web.authenticated
    def get(self):
        self.write(f"Welcome, {self.current_user.decode()}")

app = tornado.web.Application([
    (r"/", ProtectedHandler),
    (r"/login", LoginHandler),
], cookie_secret="YOUR_SECRET_KEY_HERE")

使用第三方认证

from tornado.auth import GoogleOAuth2Mixin

class GoogleLoginHandler(tornado.web.RequestHandler, GoogleOAuth2Mixin):
    async def get(self):
        if self.get_argument("code", False):
            user = await self.get_authenticated_user(
                redirect_uri="http://your.site.com/auth/google",
                code=self.get_argument("code")
            )
            # 保存用户信息
            self.set_secure_cookie("user", user["email"])
            self.redirect("/")
        else:
            await self.authorize_redirect(
                redirect_uri="http://your.site.com/auth/google",
                client_id=self.settings["google_oauth"]["key"],
                scope=["profile", "email"],
                response_type="code",
                extra_params={"approval_prompt": "auto"}
            )

部署与优化

多进程模式

if __name__ == "__main__":
    app = make_app()
    server = tornado.httpserver.HTTPServer(app)
    server.bind(8888)

    # 启动多个进程 (通常等于CPU核心数)
    server.start(0)  # 0 表示根据CPU核心数自动决定

    tornado.ioloop.IOLoop.current().start()

使用 Nginx 反向代理

Nginx 配置示例:

nginx

server {
    listen 80;
    server_name example.com;

    location / {
        proxy_pass http://127.0.0.1:8888;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    }

    location /static/ {
        alias /path/to/your/static/files/;
        expires 30d;
    }
}

静态文件处理

app = tornado.web.Application([
    # 你的路由
], 
static_path=os.path.join(os.path.dirname(__file__), "static"),
static_url_prefix="/static/"
)

测试

单元测试

from tornado.testing import AsyncHTTPTestCase

class TestApp(AsyncHTTPTestCase):
    def get_app(self):
        return make_app()

    def test_homepage(self):
        response = self.fetch("/")
        self.assertEqual(response.code, 200)
        self.assertIn(b"Hello", response.body)

    def test_post(self):
        response = self.fetch("/submit", method="POST", body="name=test")
        self.assertEqual(response.code, 200)

运行测试:

python -m unittest test_module

实际应用示例

RESTful API

import json
from tornado.web import RequestHandler
from tornado.escape import json_decode

class UserHandler(RequestHandler):
    def set_default_headers(self):
        self.set_header("Content-Type", "application/json")

    async def get(self, user_id=None):
        if user_id:
            user = await self.get_user(user_id)
            if user:
                self.write(json.dumps(user))
            else:
                self.set_status(404)
                self.write(json.dumps({"error": "User not found"}))
        else:
            users = await self.get_all_users()
            self.write(json.dumps(users))

    async def post(self):
        try:
            data = json_decode(self.request.body)
            user_id = await self.create_user(data)
            self.set_status(201)
            self.write(json.dumps({"id": user_id}))
        except Exception as e:
            self.set_status(400)
            self.write(json.dumps({"error": str(e)}))

    # 实现其他HTTP方法...

实时聊天应用

class ChatHandler(tornado.websocket.WebSocketHandler):
    waiters = set()
    cache = []
    cache_size = 200

    def get_compression_options(self):
        # 开启压缩
        return {}

    def open(self):
        ChatHandler.waiters.add(self)

    def on_close(self):
        ChatHandler.waiters.remove(self)

    @classmethod
    def update_cache(cls, chat):
        cls.cache.append(chat)
        if len(cls.cache) > cls.cache_size:
            cls.cache = cls.cache[-cls.cache_size:]

    @classmethod
    def send_updates(cls, chat):
        for waiter in cls.waiters:
            try:
                waiter.write_message(chat)
            except:
                logging.error("Error sending message", exc_info=True)

    def on_message(self, message):
        parsed = tornado.escape.json_decode(message)
        chat = {
            "id": str(uuid.uuid4()),
            "body": parsed["body"],
            "type": "chat",
            "user": parsed["user"],
            "timestamp": time.time()
        }
        chat["html"] = tornado.escape.to_basestring(
            self.render_string("message.html", message=chat))

        ChatHandler.update_cache(chat)
        ChatHandler.send_updates(chat)

总结

Tornado 是一个强大的异步 Web 框架,特别适合需要高性能和实时通信的应用。它的主要特点包括:

  1. 非阻塞 I/O 和事件驱动架构
  2. 内置 WebSocket 支持
  3. 高性能 HTTP 服务器
  4. 强大的异步处理能力
  5. 内置模板引擎
  6. 支持多种认证方式

通过本教程,您应该已经掌握了 Tornado 的核心功能。要了解更多,请参考 Tornado 官方文档









results matching ""

    No results matching ""