借助 scrapy-redis 实现 scrapy 分布式爬虫

最原始的 Scrapy 项目是只能将爬虫部署到单机上,如果要实现分布式爬虫就需要手动去维护一个待抓取的列表,那么 scrapy-redis 项目就是这样一个存在。

特性:

  • 分布式抓取,可以部署多个 spider 实例,共享同一个 redis 队列
  • 分布式后处理,抓取的内容会放到一个队列中,这样就意味可以开启足够多的实例来处理结果
  • 提供了即插即用的组件,包括定时,去重,等等

安装

pip install scrapy-redis

使用

先要在 settings 中配置,具体参考官方文档,代码集成如下:

from scrapy_redis.spiders import RedisSpider

class MySpider(RedisSpider):
    name = 'myspider'

    def parse(self, response):
        # do stuff
        pass

reference


2017-04-30 scrapy , scrapy-redis , redis , spider , distributed

Linux 安装 VMware workstation 12

VMware Workstation 12 虚拟机,适用于 RHEL/CentOS 7, Fedora 20-24, Debian 7-9, Ubuntu 16.04-14.14 and Linux Mint 17-18.

Prerequisites

  • 确保系统 64 位
  • VMware 12 不支持 32 位 CPU
  • 确保有 root 权限

安装

  • 更新

    apt-get update && apt-get upgrade # On Debian Systems

  • 下载

    wget ‘http://www.vmware.com/go/tryworkstation-linux-64’

  • 执行权限

    chmod +x VMware-Workstation-Full-12.5.5-5234757.x86_64.bundle

  • 执行安装

    ./VMware-Workstation-Full-12.5.5-5234757.x86_64.bundle

启动之后,如果没有自动找到 gcc ,需要手动指定 gcc 版本, gcc-4.8 版本,在 /usr/bin/ 目录下。

安装系统

这一步只要有系统镜像,一步步安装是很快的。省略。

宿主共享文件

在 VM 菜单下, Setting 中 Option 可以添加宿主机的共享文件夹。

reference


2017-04-29 Linux , Mint , VMware

在 Python 中使用 redis 作为任务队列 Python RQ 使用

在学习 Flask 时接触 flask-rq2,然后知道了 python-rq 因为之前使用过 Celery ,所以对 python-rq 倒也没有多大的困惑。因为 python-rq 只依赖于 redis,相较于 Celery 轻量很多,如果在 Flask 中不需要非常繁重的后台任务队列(比如只有发送注册邮件任务)可以考虑使用 python-rq,毕竟有 flask-rq2 为 Flask 度身定做,结合 Flask 要比 Celery 容易很多。

RQ (Redis Queue) is a simple Python library for queueing jobs and processing them in the background with workers. It is backed by Redis and it is designed to have a low barrier to entry. It can be integrated in your web stack easily.

官方的介绍也非常清晰,RQ 是一个用于后台任务的简单的 Python library。他使用 Redis 作为后台,他的设计目标就是降低入门门槛。他可以被轻松的整合到 Web 应用中。在项目的历史中,也不避讳的直言该项目受到 Celery,Resque 和 Flask snippet 中设计精美的部分的启发,然后作为一个轻量的任务队列框架而设计。

python-rq 的内容就不多介绍,感兴趣可以去官网 直接了解,非常简单。不过有几点需要注意。作为一个任务队列基本都要完成几个步骤

  • 定义耗时任务 def count_words_at_url(url): pass
  • 启动任务 worker rq worker
  • 提交任务到队列中 q = Queue(connection=redis_conn); q.enqueue(count_words_at_url, 'http://nvie.com')

但其实本文想要介绍的是 Flask-RQ2

Flask RQ2

安装

pip install Flask-RQ2

快速整合 Flask 参考 doc

第一步定义任务

from flask_rq2 import RQ

rq = RQ()

@rq.job
def add(x, y):
    return x + y

第二步启动 worker

flask rq worker

第三步提交到任务队列

job = add.queue(1, 2)
job2 = add.queue(3, 4, queue='high', timeout=60 * 2)

定时任务相关

@rq.job
def add(x, y):
    return x + y

# queue job in 60 seconds
add.schedule(timedelta(seconds=60), 1, 2)

# queue job at a certain datetime (UTC!)
add.schedule(datetime(2016, 12, 31, 23, 59, 59), 1, 2)

# queue job in 14 days and then repeat once 14 days later
add.schedule(timedelta(days=14), 1, 2, repeat=1)

# queue job in 12 hours with a different queue
add.schedule(timedelta(hours=12), 1, 2, queue='high', timeout=60 * 2)

# queue job every day at noon (UTC!)
add.cron('0 0 12 * * ?', 'add-one-two', 1, 2)

# queue job every minute with a different queue
add.cron('* * * * *', 'add-one-two', 1, 2, queue='high', timeout=55)

Bug

不过在使用的过程中启动 flask rq worker 时遇到一个 bug

redis.exceptions.ResponseError: Command # 7 (EXPIRE rq:worker:ev.22880 None) of pipeline caused error: value is not an integer or out of range

类似这样的 bug 似乎是因为新版本原因造成的,我降级为 rq==0.10.0 就好了。

python-rq 和 Celery 比较

一句话总结,RQ 的设计目标是简洁,而 Celery 是更加健壮完善。

区别 RQ Celery 总结
文档 RQ doc 非常简洁易懂 Celery 文档虽然复杂,但是也很清晰,Celery 有非常多的选项 都很好
监控 RQ Dashboard Celery Flower 两者都非常容易部署,能满足 90 % 的需求 很好
Broker support RQ only supports Redis Celery 能够支持很多 Redis,RabbitMQ 等等,如果要确保任务执行,Celery 可能是更好的选择 Celery 优势
优先队列 RQ 的设计简洁高效,worker 可以从不同的队列中 消费 Celery 需要不同的 worker 从不同的队列中消费 都有方法轻松实现
系统支持 RQ 仅仅支持有 fork 操作的系统(Unix) Celery 不限 Celery 获胜
语言支持 RQ 仅仅支持 Python Celery 允许从一个语言发送任务到另外的语言 Celery 更胜一筹
API RQ API simple Celery API 非常灵活 multiple result backends, nice config format, workflow canvas support,支持很多特性,同样带来的是配置的复杂 Celery 更胜一筹
子任务支持 RQ 不支持 支持从任务中创建新任务 Celery 赞
Community and Stability 活跃 活跃 两个项目都是活跃开发状态

reference


2017-04-25 python , redis , queue , python-rq , flask-rq2

每天学习一个命令:tcpdump 命令行下抓包

Tcpdump 是一个运行在命令行下的抓包工具。它允许用户拦截和显示发送或收到过程中网络连接到该计算机的TCP/IP和其他数据包。Tcpdump 适用于大多数的类Unix系统操作系统(如linux,BSD等)。类Unix系统的 tcpdump 需要使用libpcap这个捕捉数据的库就像 Windows 下的WinPcap。

常见用法

过滤主机

tcpdump -i eth1 host 192.168.1.1

过滤端口

tcpdump -i eth1 port 25

网络过滤

tcpdump -i eth1 net 192.168

同类工具

Tshark是wireshark的命令行版本,类似于tcpdump,可以用于网络抓包,封包解析等。

抓取指定设备的网络包

tshark -i eth0

抓取目的端口80的包

tshark tcp dst port 80

2017-04-23 tcpdump , linux , network

Scrapy 学习笔记及简单使用

Scrapy 是纯 Python 实现的爬虫框架(scraping and crawling framework),可以非常轻松地提取网页结构信息。最初设计时 Scrapy 仅仅作为网页抓取工具,但因其功能强大,配置简单,逐渐的被扩大使用范围,也经常被用于以下方面:

  • 数据挖掘 Data Mining
  • 信息处理 information processing
  • 历史信息存储 historical archival
  • 检测及自动化测试 monitoring and automated testing

因为网上的教程已经非常详细了,这里就重点记录解决的几个问题。

  • Scrapy 的官网地址:http://scrapy.org
  • Scrapy 在 Github 上的项目地址:https://github.com/scrapy/scrapy.git
  • Scrapy 的官方文档地址:http://doc.scrapy.org/

搭建环境

安装 python 2.7

一般 Ubuntu/Linux Mint 都会预装,查看一下即可

python -V
Python 2.7.12

如果没有安装 Python,可以使用之前推荐的 pyenv 来安装。下面的步骤也同样可以放到 pyenv 中执行。

安装 virtualenv

在开发目录中虚拟化 python 环境,避免和系统依赖冲突

sudo pip install virtualenv
source ./bin/active # 开启
# 此后再使用 pip install 时会安装在独立的目录下

具体用法可参考官网

安装依赖

sudo apt-get install libxml2-dev libxslt1-dev python-dev
pip install scrapy

项目结构

安装完成之后使用如下命令生成初始项目

scrapy startproject demo

初始目录结构如下:

$ tree demo
demo
├── demo
│   ├── __init__.py
│   ├── items.py
│   ├── middlewares.py
│   ├── pipelines.py
│   ├── settings.py
│   └── spiders
│       └── __init__.py
└── scrapy.cfg

2 directories, 7 files

文件说明:

  • scrapy.cfg 项目的配置信息,主要为 Scrapy 命令行工具提供一个基础的配置信息。(爬虫相关的配置信息在 settings.py 文件中)
  • items.py 设置数据存储模板,用于结构化数据
  • middlewares 中间件,全局处理请求
  • pipelines 数据处理行为,如:一般结构化的数据持久化,存储数据库等操作
  • settings.py 爬虫的配置文件,如:递归的层数、并发数,延迟下载等
  • spiders 爬虫目录,如:创建文件,编写爬虫规则

进入目录

cd demo
scrapy genspider example example.com   # 使用该命令安装模板生成 Spider

更详细的入门见官网:https://doc.scrapy.org/en/latest/intro/tutorial.html

架构

Scrapy 使用了 Twisted 异步网络库来处理网络,可以对网站页面进行大量非阻塞的异步请求,能够对目标网站按照网站结构的层级次序逐级向下采集,并可以在已采集到的页面中提取其他符合要求的目标网页地址资源,从而实现从单个或多个入口进入,对目标网站进行全面扫描并获取所需的数据。结构如下:

Scrapy 的核心组件:

  • 引擎(Scrapy Engine) 用来处理整个系统的数据流,触发事务(框架核心),负责控制和调度各个组件

  • 调度器(Scheduler) 用来接受引擎发过来的请求,压入队列中,并在引擎再次请求的时候返回,如:要抓取的链接(URL)的优先队列,由它来决定下一个要抓取的 URL 是什么,并进行去重。

  • 下载器(Downloader) 下载器负责对目标页面发出请求并获取页面反馈的数据,之后传递给 Scrapy 引擎,最终传递给爬虫进行数据提取。

  • 爬虫(Spider) 爬虫是 Scrapy 的用户自行编写的一段数据提取程序,针对下载器返回的数据结构进行分析(一般为 HTML),并提取出其中的结构化数据,并可以指定其他需要跟进的 URL 和处理方法。每个爬虫负责处理一个或多个特定的网站。

  • 项目管道(Pipline) 负责处理爬虫从网页中抽取的实体,主要的功能是持久化实体(Item)、验证实体的有效性、清除垃圾信息。当页面被爬虫解析后,解析后内容将会发送到项目管理通道,经过几个特定的次序处理。

  • 数据 (Item) Item 是爬虫针对网页数据做解析后返回的数据,需要在使用之前预先定义好 Item 的数据结构,爬虫的解析程序负责将提取到的数据填充到 Item 中,并将 Item 返回,传递给数据管道进行后续处理。

  • 下载器中间件(Downloader Middlewares) 位于 Scrapy 引擎和下载器之间的框架,主要是处理 Scrapy 引擎和下载器之间的请求与响应。

  • 爬虫中间件(Spider Middlewares) 介于 Scrapy 引擎和 Spider 之间的框架,处理爬虫的响应输入和请求输出。

  • 调度中间件(Scheduler Middlewares) 介于 Scrapy 引擎和调度之间的中间件,从 Scrapy 引擎发送到调度的请求和响应。

图解见官网:https://doc.scrapy.org/en/latest/topics/architecture.html

使用 ImagesPipeline 下载图片

在 scrapy 中有实现的 ImagesPipeline , 默认即可下载大量的图片,如果想要实现自己的下载图片 Pipeline,并且自定义输出图片的文件的名字,可以重写 file_path() 方法。

import scrapy
from scrapy.pipelines.images import ImagesPipeline

class ImagePipeline(ImagesPipeline):
    default_headers = {
        'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36'
    }

    # 对各个图片 URL 返回一个 Request
    def get_media_requests(self, item, info):
        try:
            for image_url in item['image_urls']:
                f = image_url.split('.')[-1]
                yield scrapy.Request(image_url, meta={'image_name': item['image_name'], 'format': f}, headers=self.default_headers)
        except Exception as error:
            print error

    # 当一个单独项目中的所有图片请求完成时 (success, image_info_or_failure)
    def item_completed(self, results, item, info):
        image_paths = [x['path'] for ok, x in results if ok]
        if not image_paths:
            # raise DropItem("Item contains no images")
            print "Image path no exist"
        return item


    # Override the convert_image method to disable image conversion

    # scrapy convert image to jpg 重写此方法,可以下载自定的图片格式,不过可能需要特殊处理格式
    # def convert_image(self, image, size=None):
    #     buf = StringIO()
    #     try:
    #         image.save(buf, image.format)
    #     except Exception, ex:
    #         raise ImageException("Cannot process image. Error: %s" % ex)
    #
    #     return image, buf

    # 默认情况下,使用 ImagePipeline 组件下载图片的时候,图片名称是以图片 URL 的 SHA1 值进行保存的。
    # scrapy 0.12 可以覆盖 image_key 方法,在此后版本中 使用 file_path 来自定义下载图片名称
    # def image_key(self, url):
    #     image_guid = hashlib.sha1(url).hexdigest()
    #     return 'full/%s.jpg' % (image_guid)

    # http://stackoverflow.com/questions/6194041/scrapy-image-download-how-to-use-custom-filename/22263951#22263951
    def file_path(self, request, response=None, info=None):
        name = request.meta['image_name']
        f = request.meta['format']
        return 'full/%s.jpg' % name

定义 middlewares

middlewares 是 Scrapy 在请求时中间必须经过的步骤,在 settings 中有设置 DOWNLOADER_MIDDLEWARES

import random

from scrapy.downloadermiddlewares.useragent import UserAgentMiddleware

from scrapy.conf import settings


class RandomUserAgentMiddleware(UserAgentMiddleware):

    def __init__(self, user_agent=''):
        self.user_agent = user_agent

    # 每一请求都会走这个函数,在这里随机挑选 UA
    def process_request(self, request, spider):
        ua = random.choice(settings.get('USER_AGENT_LIST'))
        if ua:
            print "******Current UserAgent: %s **************" % ua

            request.headers.setdefault("User-Agent", ua)


class ProxyMiddleware(object):
    def process_request(self, request, spider):
        request.meta['proxy'] = random.choice(settings.get('HTTP_PROXY_LIST'))

多 pipeline 协同处理

Item 在 Spider 中构造之后会被传送到 Pipeline 中,按照一定的顺序执行。一般情况下 pipeline 会做一些数据处理或存储的事情,一般写数据库操作都放到 Pipeline 中。

当一个 Item 要被多个 pipeline 处理时,需要定义:

ITEM_PIPELINES = {
    'imdb.pipelines.MoviePipeline': 300,
    'imdb.image_pipeline.ImagePipeline': 300
}

此时,Item 就会被两个 pipeline 处理,如果某个 pipeline 处理某一类事件,比如上述例子中, MoviePipeline 处理数据的存储,而 ImagePipeline 处理图片的下载。


2017-04-23 scrapy , python , crawler , spider , 学习笔记

使用 pyenv 管理 Python 版本

记录一下使用过程,留备以后使用。

pyenv 是 Python 版本管理工具。 pyenv 可以改变全局的 Python 版本,安装多个版本的 Python, 设置目录级别的 Python 版本,还能创建和管理 virtual python environments 。所有的设置都是用户级别的操作,不需要 sudo 命令。

pyenv 主要用来管理 Python 的版本,比如一个项目需要 Python 2.x ,一个项目需要 Python 3.x 。 而 virtualenv 主要用来管理 Python 包的依赖,不同项目需要依赖的包版本不同,则需要使用虚拟环境。

pyenv 通过系统修改环境变量来实现 Python 不同版本的切换。而 virtualenv 通过将 Python 包安装到一个目录来作为 Python 包虚拟环境,通过切换目录来实现不同包环境间的切换。

pyenv 的美好之处在于,它并没有使用将不同的 $PATH 植入不同的 shell 这种高耦合的工作方式,而是简单地在 $PATH 的最前面插入了一个垫片路径(shims):~/.pyenv/shims:/usr/local/bin:/usr/bin:/bin。所有对 Python 可执行文件的查找都会首先被这个 shims 路径截获,从而使后方的系统路径失效。

安装之前

不同系统请参考 Common build problems,安装必须的工具。

pyenv 安装

根据官网的 安装说明 或者 自动安装 。 如果使用 Mac 直接使用 Homebrew。安装成功后记得在 .bashrc 或者 .bash_profile 中添加三行来开启自动补全。

export PATH="$HOME/.pyenv/bin:$PATH"
eval "$(pyenv init -)"
eval "$(pyenv virtualenv-init -)"

根据自己的环境配置。

自动安装

pyenv 提供了自动安装的工具,执行命令安装即可:

curl -L https://raw.githubusercontent.com/yyuu/pyenv-installer/master/bin/pyenv-installer | bash

保证系统有 git ,否则需要新安装 git。

手动安装

如果想要更加详细的了解安装过程,可以使用手动安装。将 pyenv 检出到你想安装的目录。建议路径为:$HOME/.pyenv

cd ~
git clone git://github.com/yyuu/pyenv.git .pyenv
echo 'export PYENV_ROOT="$HOME/.pyenv"' >> ~/.bashrc
echo 'export PATH="$PYENV_ROOT/bin:$PATH"' >> ~/.bashrc
echo 'eval "$(pyenv init -)"' >> ~/.bashrc
source ~/.bashrc

添加环境变量。PYENV_ROOT 指向 pyenv 检出的根目录,并向 $PATH 添加 $PYENV_ROOT/bin 以提供访问 pyenv 命令的路径。

这里的 shell 配置文件(~/.bash_profile)依不同 Linux 而需作修改,如果使用 Zsh 则需要相应的配置 ~/.zshrc

在使用 pyenv 之后使用 pip 安装的第三方模块会自动安装到当前使用 python 版本下,不会和系统模块产生冲突。使用 pip 安装模块之后,如果没有生效,记得使用 pyenv rehash 来更新垫片路径。

pyenv 常用命令

使用 pyenv commands 显示所有可用命令

查看本机安装 Python 版本

使用如下命令查看本机安装版本

pyenv versions

星号表示当前正在使用的 Python 版本。使用 python -V 确认版本。

查看可安装 Python 版本

使用如下命令查看可安装版本

pyenv install -l

python 安装与卸载

$ pyenv install 2.7.3   # 安装 python
$ pyenv uninstall 2.7.3 # 卸载 python

python 切换

$ pyenv global 2.7.3  # 设置全局的 Python 版本,通过将版本号写入 ~/.pyenv/version 文件的方式。
$ pyenv local 2.7.3 # 设置 Python 本地版本,通过将版本号写入当前目录下的 .python-version 文件的方式。通过这种方式设置的 Python 版本优先级较 global 高。

python 优先级

shell > local > global

pyenv 会从当前目录开始向上逐级查找 .python-version 文件,直到根目录为止。若找不到,就用 global 版本。

$ pyenv shell 2.7.3 # 设置面向 shell 的 Python 版本,通过设置当前 shell 的 PYENV_VERSION 环境变量的方式。这个版本的优先级比 local 和 global 都要高。–unset 参数可以用于取消当前 shell 设定的版本。
$ pyenv shell --unset

$ pyenv rehash  # 创建垫片路径(为所有已安装的可执行文件创建 shims,如:~/.pyenv/versions/*/bin/*,因此,每当你增删了 Python 版本或带有可执行文件的包(如 pip)以后,都应该执行一次本命令)

pyenv-virtualenv

pyenv 插件:pyenv-virtualenv

使用自动安装 pyenv 后,它会自动安装部分插件,通过 pyenv-virtualenv 插件可以很好的和 virtualenv 结合:

einverne@ev  ~  cd ~/.pyenv/plugins
einverne@ev  ~/.pyenv/plugins   master  ll
total 24K
drwxr-xr-x 4 einverne einverne 4.0K Apr 22 10:55 pyenv-doctor
drwxr-xr-x 5 einverne einverne 4.0K Apr 22 10:55 pyenv-installer
drwxr-xr-x 4 einverne einverne 4.0K Apr 22 10:55 pyenv-update
drwxr-xr-x 7 einverne einverne 4.0K Apr 22 10:55 pyenv-virtualenv
drwxr-xr-x 4 einverne einverne 4.0K Apr 22 10:55 pyenv-which-ext
drwxr-xr-x 5 einverne einverne 4.0K Apr 22 10:54 python-build

创建虚拟环境

$ pyenv virtualenv 2.7.10 env-2.7.10

若不指定 python 版本,会默认使用当前环境 python 版本。如果指定 Python 版本,则一定要是已经安装过的版本,否则会出错。环境的真实目录位于 ~/.pyenv/versions 下

列出当前虚拟环境

pyenv virtualenvs
pyenv activate env-name  # 激活虚拟环境
pyenv deactivate #退出虚拟环境,回到系统环境

删除虚拟环境

pyenv uninstall my-virtual-env
rm -rf ~/.pyenv/versions/env-name  # 或者删除其真实目录

使用 pyenv 来管理 python,使用 pyenv-virtualenv 插件来管理多版本 python 包。此时,还需注意,当我们将项目运行的 env 环境部署到生产环境时,由于我们的 python 包是依赖 python 的,需要注意生产环境的 python 版本问题。

所有命令

$ pyenv commands
activate
commands
completions
deactivate
doctor
exec
global
help
hooks
init
install
installer
local
offline-installer
prefix
rehash
root
shell
shims
uninstall
update                 # 更新 pyenv 及插件
version
--version
version-file
version-file-read
version-file-write
version-name
version-origin
versions
virtualenv
virtualenv-delete
virtualenv-init
virtualenv-prefix
virtualenvs
whence
which

PyCharm

PyCharm 中可以非常方便的切换 Python 环境非常方便。但是因为个人原因 Java 和 Python 同时用的概率比较高,所以一直使用 Intellij IDEA 安装了 Python 插件。PyCharm 是为 Python 单独开发,所以如果不需要在语言之间切换用 PyCharm 还是比较方便的。

Tips

更换 pip 源

因为国内网络环境,如果在局域网内下载 pip 慢,可以尝试使用 aliyun 提供的镜像,创建 vim ~/.pip/pip.conf ,然后填入:

[global]
index-url = http://mirrors.aliyun.com/pypi/simple/

[install]
trusted-host=mirrors.aliyun.com

参考


2017-04-22 Python , pyenv , 经验总结

Redis 常用命令

Redis 是典型的 KV 数据库,通常所说的 Redis 数据结构指的是 Value 的数据结构,常用的数据结构有 String, Hash, List, Set, Sorted Set. 前三种类型不用多讲,几乎每种语言都存在,后两种 set 是单纯的集合, Sorted Set 是有序集合,在集合内可以根据 score 进行排序。Redis 的命令不区分大小写,但通常情况下使用大写以示区分。

几个常用网址:

对 Redis 键的命名格式并没有强制性的要求,不过一般约定为,”对象类型:对象 ID:对象属性“,比如使用 user:1:friends 表示 id 为 1 的用户的好友列表。为了方便后期维护,键的命名一定要有意义。

redis-cli 是 Redis 自带的命令行工具(类似于 MySQL 的 mysql 命令), 直接在命令行终端与 redis server 执行所有命令和返回响应。下面所有命令都可以在 cli 交互式命令行中执行。

交互式命令参数

redis-cli 命令行自带一些参数,可以使用 redis-cli --help 查看。

通常 -p 参数指定端口, -a 参数指定密码, -h 指定 hostname。

--stat 参数打印状态

如果本地没有安装 Redis,可以通过在线模拟尝试 Try Redis

基础命令

获取符合规则的键名列表

KEYS pattern

pattern 支持 glob 风格的通配符格式。

可以使用 EXISTS 命令来判断一个键是否存在

EXISTS key

使用 TYPE 键的数据类型

TYPE key

默认使用 cli 命令登录之后是 redis 的 0 数据库,可以使用 select 命令来切换数据库

SELECT 1

将 key 从一个数据库移动到另外一个数据库可以使用 move 命令

MOVE key <db index>

从所有 key 中随机选择一个 key

RANDOMKEY

重命名 key

RENAME key new_key

字符串类型操作命令

字符串类型,最大容量 512MB,字符串类型可以包含任何数据,图片的二进制,或者序列化的对象。

命令 行为
GET 获取存储在键中的值
SET 给 KEY 设置值
DEL 删除存储在 KEY 中的值,该命令可以用于所有类型

赋值与取值

将 value 关联到 key,如果 key 有值, SET 命令覆盖。对于某个原本带有生存时间(TTL)的键来说, 当 SET 命令成功在这个键上执行时, 这个键原有的 TTL 将被清除。

SET key value
GET key

使用 SETNX (set not exists) 可以实现如果 key 存在时不做任何操作

SETNX key value  # 如果 key 存在,则返回 0,如果设置成功返回 1

可以使用 SETEX 来针对 key 设置过期时间,以秒为单位

SETEX key seconds value

递增递减数字

让当前键值递增,操作键不存在时默认为 0,当键不是整数时,报错。对不存在的 key ,则设置 key 为 1

INCR key

通过 increment 参数来在 key 的基础上加上一个增量。

INCRBY key increment

递减数值,对于不存在的 key,设置为 -1

DECR key

递减一个量,DECRBY 为了增加可读性,完全可以使用 INCRBY 一个负值来实现同样的效果

DECRBY key decrement

增加指定浮点数

INCRBYFLOAT key increment

向尾部追加值

如果 key 已经存在,并且 value 是一个字符串,那么 APPEND 将 value 追加到末尾

APPEND key value

获取字符串长度

返回 key 所存储的字符串长度

STRLEN key

多 key 操作

获取多个值

MGET key [key ...]

设置多个 key value, 一次性设置多个值,返回 0 表示没有值被覆盖

MSET key value [key value ...]

其他复杂命令

SETRANGE key offset value  # 用 value 值覆盖给定 key 从 offset 开始所存储的字符串
MSETNX key value key value # 一次性设置多个值, SETNX 的 multi 版本
GETSET key 				# 设置 key 的值,并返回 key 的旧值
GETRANGE key start end  # 截取 start 和 end 偏移量的字符串

散列类型操作命令

通过 HSET 建立的键是散列类型,用过 SET 命令建立的是字符串类型。散列或者哈希非常适合存储对象,添加和删除操作复杂度平均 O(1).

命令 行为
HSET 给散列起给定的键值名
HGET 获取给定散列值
HGETALL 获取散列包含的所有键值对
HDEL 如果给定键存在,则移除该键

赋值取值

将哈希表 key 中的域 field 的值设为 value 。

如果 key 不存在,一个新的哈希表被创建并进行 HSET 操作。

如果域 field 已经存在于哈希表中,旧值将被覆盖。

HSET key field value
HGET key field

HMSET key field value [field value ...]
HMGET key field [field ...]

当字段不存在时赋值

HSETNX key field value

获取所有域和值

HGETALL key

查看哈希表 key 中,给定域 field 是否存在。

HEXISTS key field

增加数字,返回增值后的字段值

HINCRBY key field increment

删除一个或者多个字段,返回被删除的字段个数

HDEL key field [field ...]

获取指定 hash 的 field 数量

HKEYS key

获取指定 hash 的 values

HVALS key

列表类型

有序的字符串列表,向列表两端添加元素,或者获取列表的某一个片段。列表类型内部使用双向链表,向列表两端添加元素时间复杂度 O(1)

命令 行为
LPUSH 给定值加入列表左端
RPUSH 给定值加入列表右端
LRANGE 获取列表给定范围的所有值
LINDEX 获取列表在给定位置上的单个元素
LPOP 从列表左端弹出一个值,并返回被弹出的值

LPUSH 用来向列表左边增加元素,返回值表示增加元素后列表的长度,RPUSH 同理

LPUSH key value [value ...]
RPUSH key value [value... ]

从左边右边弹出元素

LPOP key
RPOP key

获取列表中元素的个数

LLEN key

获取列表中某一个片段

LRANGE key start stop

删除列表中指定的值

LREM key count value

获取设置指定索引的元素值

LINDEX key index

删除指定索引范围之外的所有元素。

LTRIM key start end

向列表中插入元素,将值 value 插入到列表 key 当中,位于值 pivot 之前或之后。

LINSERT key BEFORE | AFTER pivot value

先执行 RPOP 命令再执行 LPUSH 命令

RPOPLPUSH source destination

集合类型

set 是集合,和数学中的集合概念相似。 Redis 的 set 是 String 类型的无序集合,set 元素最大可包含 2 的 32 次方个元素。

向集合中增加一个或者多个元素,如果不存在则创建,如果存在则忽略。SREM 用来从集合中删除一个或者多个元素,并返回删除成功的个数

命令详解,将给定元素添加到集合

SADD key member [member ...]

如果给定的元素存在于集合中,移除该元素

SREM key member [member ...]

随机返回并删除一个元素

SPOP key

随机返回一个元素,但是不删除, Redis 2.6 版本之后接受可选 count 参数。

SRANDMEMBER key [count]

返回集合中的所有元素

SMEMBERS key

判断元素是否在集合中

SISMEMBER key member

集合间运算

集合差集 A-B

SDIFF key [key ...]

集合交集运算 A 交 B

SINTER key [key ..]

集合并集 A 并 B

SUNION key [key...]

获得集合中的元素个数

SCARD key

将结果保存到 destination 键中

SDIFFSTORE destination key [key ...]
SINTERSTORE destination key [key ...]
SUNIONSTORE destination key [key...]

有序集合

在集合的基础上加上了排序,有序集合的键被称为成员 member,每个成员都是不同的,有序集合的值称为分值 score,分值必须为浮点数。

有序集合中加入一个元素和该元素的分数,如果元素存在则用新的分数替换

ZADD key score member [score member ...]

获得元素分数

ZSCORE key member

获取排名在某个范围的元素列表,按照元素分数从小到大顺序返回索引从 start 到 stop 之间的所有元素,包括两端。可选参数可返回元素分数。但 stop 为 -1 时返回全部。

ZRANGE key start stop [WITHSCORES]

按 score 从大到小

ZREVRANGE key start stop

元素分数从小到大顺序返回元素分数在 min 和 max 之间的元素

ZRANGEBYSCORE key min max

增加某个元素分数,返回值为更改过后的分数

ZINCRBY key increment member

获取集合中元素的数量,返回 integer 数量

ZCARD key

获得指定分数范围内的元素个数,返回个数

ZCOUNT key min max

删除一个或者多个元素,返回成功删除的元素数量

ZREM key member [member ...]

按照排名范围删除元素,元素分数从小到大顺序(索引 0 表示最小值),删除指定排名范围内的所有元素,并返回删除的数量

ZREMRANGEBYRANK key start stop

按照分数范围删除元素,删除指定分数范围内的所有元素,返回删除元素的数量

ZREMRANGEBYSCORE key min max

获得元素的排名,从小到大顺序,分数最小排名为 0。

ZRANK key member

获取元素的排名,从大到小

ZREVRANK key member

计算多个有序集合的交集,并将结果存储在 destination 键中,同样以有序集合存储,返回 destination 键中的元素个数

ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM | MIN | MAX]

AGGREGATE 是 SUM 时(默认值), destination 键中元素的分数是每个参与计算的集合中该元素分数的和。

其他情况同理,MIN 为最小值,MAX 为最大值

事务

Redis 中事务 transaction 是一组命令的集合。事务同命令一样都是 Redis 的最小执行单位。

MULTI
SADD ”user:1:following" 2
SADD "user:2:followers" 1
EXEC

事务中 WATCH 命令,监控一个或者多个键,一旦其中一个键被修改(或删除),之后的事务就不会执行,监控持续到 EXEC 命令

使用 DISCARD 命令来取消事务,DISCARD 命令来清空事务命令队列并退出事务上下文,回滚。

过期时间

关系型数据库一般需要额外设置一个字段“到期时间”,然后定期删除,而在 Redis 中可使用 EXPIRE 命令设置一个键的过期时间,到时间后 Redis 会自动删除它。

EXPIRE key seconds

返回 1 表示成功,0 为键不存在或者设置失效。 EXPIRE 命令参数必须为整数,最小单位为 1 秒,如果想要更加精确的控制过期时间可以使用 PEXPIRE 命令,单位为毫秒,也可以使用 PTTL 来以毫秒为单位返回剩余时间。

TTL key

TTL 命令查看键多久时间被删除,当键不存在时返回 -2,当键不过期时返回 -1

PERSIST key

取消键的过期时间,成功清除返回 1,否则返回 0

SORT 命令对列表类型,集合类型和有序集合类型键进行排序,可以完成关系型数据库中连接查询类似的任务。

SORT 命令时

  • 尽可能减少待排序键中的元素数量
  • 使用 LIMIT 参数只获取需要的数据
  • 排序的数据比较大,尽可能使用 STORE 参数将结果缓存

Redis Client

Redis 支持的客户端

https://redis.io/clients

2017-04-21 redis , database , 学习笔记

Redis 介绍

Redis (Remote Dictionary Server) 是由 Salvatore Sanfilippo(antirez) 开发的开源数据库,基于内存的 Key-Value 类型的 NoSQL 。在 DB Engines Ranking K-V 数据库中排行第一 1

Redis 是 REmote DIctionary Server 远程字典服务 的缩写,他以字典结构存储数据,并允许其他应用通过 TCP 协议来读写字典中的内容。

Redis 支持很多的特性:

  • 所有数据都必须放在内存中
  • 支持数据持久化:AOF 和 RDB 两种类型
  • 支持异步数据复制

Redis Cluster 常用 5 种数据结构 (String, Lists, Sets, Sorted Set, Hash) 以单进程方式处理请求,数据持久化和网络 Socket IO 等工作是异步进程

安装

源中安装

在 Debian/Ubuntu/Linux Mint 下直接安装即可,但是 redis 对内核有要求,如果安装失败的时候, -uname -a 看一下自己的内核,如果版本太低就升级一下。

sudo apt-get install redis-server

安装成功之后就可以使用

sudo service redis-server status # 查看当前状态
sudo service redis-server stop/start # 等等来控制 redis-server 的状态

最方便最快捷的安装方式,如果使用 docker 也可以使用 docker 中官方的源。

手动安装

官网下载 https://redis.io/download

下载最新的稳定版 Redis,可以从 http://download.redis.io/redis-stable.tar.gz 获取最新稳定版

curl -O http://download.redis.io/redis-stable.tar.gz

解压 tag.gz

tar xzvf redis-stable.tar.gz

进入解压后目录

cd redis-stable

编译和安装,运行 make 命令

make

当二进制文件编译完成之后,运行 test 确保一切都正确

make test

当所有测试跑通过之后安装到系统

sudo make install

运行 test 的时候报了一个错误:

*** [err]: Test replication partial resync: ok psync (diskless: yes, reconnect: 1) in tests/integration/replication-psync.tcl

参考该 issue 使用单核运行 test

taskset -c 1 sudo make test

配置 Redis

在 etc 目录下新建 redis 配置文件目录

sudo mkdir /etc/redis

将默认配置文件拷贝到配置目录

sudo cp redis.conf /etc/redis

编辑配置文件

sudo vim /etc/redis/redis.conf

修改 supervised 为 systemd

# If you run Redis from upstart or systemd, Redis can interact with your
# supervision tree. Options:
#   supervised no      - no supervision interaction
#   supervised upstart - signal upstart by putting Redis into SIGSTOP mode
#   supervised systemd - signal systemd by writing READY=1 to $NOTIFY_SOCKET
#   supervised auto    - detect upstart or systemd method based on
#                        UPSTART_JOB or NOTIFY_SOCKET environment variables
# Note: these supervision methods only signal "process is ready."
#       They do not enable continuous liveness pings back to your supervisor.
supervised systemd

接下来,寻找 dir 配置, 该参数制定 Redis 存储数据的目录,需要一个 Redis 有写权限的位置,使用 /var/lib/redis.

# The working directory.
#
# The DB will be written inside this directory, with the filename specified
# above using the 'dbfilename' configuration directive.
#
# The Append Only File will also be created inside this directory.
#
# Note that you must specify a directory here, not a file name.
dir /var/lib/redis

修改完毕,保存关闭。

创建 systemd unit

创建 redis.service 文件

sudo vim /etc/systemd/system/redis.service

[Unit] 单元中提供描述,和启动需要在网络可用之后。[Service] 中定义服务的具体动作,自定义用户 redis,以及 redis-server 的地址。

[Unit]
Description=Redis In-Memory Data Store
After=network.target

[Service]
User=redis
Group=redis
ExecStart=/usr/local/bin/redis-server /etc/redis/redis.conf
ExecStop=/usr/local/bin/redis-cli shutdown
Restart=always

[Install]
WantedBy=multi-user.target

创建 redis 用户,组

创建用户,组

sudo adduser --system --group --no-create-home redis

创建文件夹

sudo mkdir /var/lib/redis

给予权限

sudo chown redis:redis /var/lib/redis

修改权限,普通用户无法访问

sudo chmod 770 /var/lib/redis

运行 Redis

启动

sudo systemctl start redis

查看状态

sudo systemctl status redis

显示

sudo service redis status
● redis.service - Redis In-Memory Data Store
   Loaded: loaded (/etc/systemd/system/redis.service; disabled; vendor preset: enabled)
   Active: active (running) since Sat 2017-04-22 18:59:56 CST; 2s ago
 Main PID: 28750 (redis-server)
   CGroup: /system.slice/redis.service
           └─28750 /usr/local/bin/redis-server 127.0.0.1:6379

Apr 22 18:59:56 ev redis-server[28750]:   `-._    `-._`-.__.-'_.-'    _.-'
Apr 22 18:59:56 ev redis-server[28750]:       `-._    `-.__.-'    _.-'
Apr 22 18:59:56 ev redis-server[28750]:           `-._        _.-'
Apr 22 18:59:56 ev redis-server[28750]:               `-.__.-'
Apr 22 18:59:56 ev redis-server[28750]: 28750:M 22 Apr 18:59:56.445 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower valu
Apr 22 18:59:56 ev redis-server[28750]: 28750:M 22 Apr 18:59:56.445 # Server started, Redis version 3.2.8
Apr 22 18:59:56 ev redis-server[28750]: 28750:M 22 Apr 18:59:56.445 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.ov
Apr 22 18:59:56 ev redis-server[28750]: 28750:M 22 Apr 18:59:56.445 # WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage
Apr 22 18:59:56 ev redis-server[28750]: 28750:M 22 Apr 18:59:56.445 * DB loaded from disk: 0.000 seconds
Apr 22 18:59:56 ev redis-server[28750]: 28750:M 22 Apr 18:59:56.445 * The server is now ready to accept connections on port 6379

使用 redis-cli 客户端测试。

redis-cli

然后运行 ping ,会得到 PONG。

127.0.0.1:6379> ping
PONG
127.0.0.1:6379> set test "It's working"
OK
127.0.0.1:6379> get test
"It's working"
127.0.0.1:6379> exit

然后重启 redis

sudo systemctl restart redis.service

然后进入 redis-cli:

127.0.0.1:6379> get test
"It's working"

如果能够获得,就证明配置好了。

开机启动

sudo systemctl enable redis

在启动了 redis 之后就可以再熟悉一下他的命令 了。

多数据库支持

Redis 实例提供了多个用来存储数据库的字典,客户端可以用来指定将数据存储在哪个数据库中,类似关系型数据库可以新建很多个数据库,可以将 Redis 的每一个字典都理解成为一个数据库。

每个数据库对外都是以一个从 0 开始的递增数字命名, Redis 默认支持 16 个数据库。 客户端与 Redis 建立连接之后会自动选择 0 号数据库,不过随时可以使用 SELECT 命令来更换数据库,比如选择 1 号数据库 SELECT 1.

注意:Redis 不支持自定义数据库名,每个数据库都以编号命名;Redis 也不支持为每一个数据库设置不同的访问密码;多个数据库之间并不是完全隔离, FLUSHALL 命令可以清空 Redis 实例中所有数据库数据。

reference

参考: https://www.digitalocean.com/community/tutorials/how-to-install-and-configure-redis-on-ubuntu-16-04


2017-04-20 redis , database , nosql , 学习笔记

每天学习一个命令:df 查看磁盘剩余空间

之前也介绍过 di disk information,不过系统默认不带,需要自己安装,如果遇到没有权限安装时,就可以使用 df 来查看当前机器剩余磁盘空间。

df 全称 disk filesystem,用于显示 Linux 系统磁盘利用率,通常也用来查看磁盘占用空间。

命令格式

df [OPTIONS] [FILE]

直接使用不加任何参数会显示所有当前被挂载的文件系统的可用空间。默认会以 1KB 为单位显示。

选项:

-a      全部文件系统列表
-h      方便阅读方式显示
-H      等于“-h”,但是计算式,1K=1000,而不是 1K=1024
-i      显示 inode 信息
-k      区块为 1024 字节
-l      只显示本地文件系统
-m      区块为 1048576 字节
--no-sync 忽略 sync 命令
-P      输出格式为 POSIX
--sync  在取得磁盘信息前,先执行 sync 命令
-T      文件系统类型

使用实例

直接使用

直接使用 df ,显示设备名称、总块数、总磁盘空间、已用磁盘空间、可用磁盘空间和文件系统上的挂载点。

Filesystem     1K-blocks      Used Available Use% Mounted on
udev             8126360         0   8126360   0% /dev
tmpfs            1629376     75432   1553944   5% /run
/dev/sdb1      240230912 185617700  42387064  82% /
tmpfs            8146864    546884   7599980   7% /dev/shm
tmpfs               5120         4      5116   1% /run/lock
tmpfs            8146864         0   8146864   0% /sys/fs/cgroup
/dev/loop1         83712     83712         0 100% /snap/core/4206
/dev/loop4        259584    259584         0 100% /snap/electronic-wechat/7
cgmfs                100         0       100   0% /run/cgmanager/fs
tmpfs            1629376        72   1629304   1% /run/user/1000
/dev/sda3      723180576     70584 686351464   1% /media/user/add8bd89-da2a-4573-ac6e-7ec44f8c5414
/dev/loop5         84096     84096         0 100% /snap/core/4327
/dev/loop3         95872     95872         0 100% /snap/slack/6
/dev/loop6         88704     88704         0 100% /snap/core/4407

df 命令输出:

  • 第一列是文件系统对应的设备文件路径名,一般是硬盘分区名
  • 第二列是分区包含的数据块数目
  • 第三、四列分别表示已用和可用的数据块数目,三、四列块数之和不等于第二列块数,缺省每个分区都会预留少量空间给系统管理员使用
  • 第五列,Use% 表示普通用户空间占用百分比
  • 最后一列,表示文件系统挂载点

优化输出,以更加易读的方式输出结果

df -h 可以显示比较友好的输出

Filesystem      Size  Used Avail Use% Mounted on
udev            7.8G     0  7.8G   0% /dev
tmpfs           1.6G   74M  1.5G   5% /run
/dev/sdb1       230G  178G   41G  82% /
tmpfs           7.8G  534M  7.3G   7% /dev/shm
tmpfs           5.0M  4.0K  5.0M   1% /run/lock
tmpfs           7.8G     0  7.8G   0% /sys/fs/cgroup
/dev/loop1       82M   82M     0 100% /snap/core/4206
/dev/loop4      254M  254M     0 100% /snap/electronic-wechat/7
cgmfs           100K     0  100K   0% /run/cgmanager/fs
tmpfs           1.6G   72K  1.6G   1% /run/user/1000
/dev/sda3       690G   69M  655G   1% /media/mi/add8bd89-da2a-4573-ac6e-7ec44f8c5414
/dev/loop5       83M   83M     0 100% /snap/core/4327
/dev/loop3       94M   94M     0 100% /snap/slack/6
/dev/loop6       87M   87M     0 100% /snap/core/4407

df -hT 其中 -T 参数显示文件类型 ext4 等等

Filesystem     Type      Size  Used Avail Use% Mounted on
udev           devtmpfs  7.8G     0  7.8G   0% /dev
tmpfs          tmpfs     1.6G   74M  1.5G   5% /run
/dev/sdb1      ext4      230G  178G   41G  82% /

说明:

  • -h 目前磁盘空间和使用情况 以更易读的方式显示
  • -H 和上面的 -h 参数相同,换算时采用 1000 而不是 1024 进行容量转换
  • -k 以单位 1K 显示磁盘的使用情况

用 inode 方式显示磁盘使用情况

使用 -i 参数

df -i

输出结果为

Filesystem       Inodes   IUsed    IFree IUse% Mounted on
udev            2031887     539  2031348    1% /dev
tmpfs           2036709     941  2035768    1% /run
/dev/sdb1      15269888 1896147 13373741   13% /
tmpfs           2036709     497  2036212    1% /dev/shm
tmpfs           2036709       4  2036705    1% /run/lock
tmpfs           2036709      18  2036691    1% /sys/fs/cgroup
/dev/loop0        28782   28782        0  100% /snap/electronic-wechat/7
cgmfs           2036709      14  2036695    1% /run/cgmanager/fs
tmpfs           2036709     132  2036577    1% /run/user/1000
/dev/loop8        12810   12810        0  100% /snap/core/6034
/dev/sda1      14082048 1673302 12408746   12% /media/mi/3d1b7e3e-c184-4664-9555-2b088997f2c8
/dev/sda3      45932544      11 45932533    1% /media/mi/8803a3c6-1561-4957-b9b3-e60d5688d1a6
/dev/sdc       12229708      20 12229688    1% /media/mi/data

inode (index node) 是一个在类 Unix 文件系统下用来描述文件系统对象(文件或者目录)的数据结构。每一个 indoe 保存对象数据的属性和磁盘块地址。文件类型对象属性包括 metadata(修改时间,访问属性等)和文件的所有者以及文件权限。

df -ih 显示 inodes

Filesystem     Inodes IUsed IFree IUse% Mounted on
udev             2.0M   520  2.0M    1% /dev
tmpfs            2.0M   888  2.0M    1% /run
/dev/sdb1         15M  1.8M   13M   12% /

输出磁盘文件系统

常见的文件系统有 Windows 下的 FAT32,NTFS,Unix 系统下的 ext3, ext4,添加 -T 参数在输出结果中增加一列来表示当前分区的文件系统。

df -T

而如果要在结果中筛选特定文件系统的分区可以使用 -t ext4 ,比如要过滤出只显示 ext4 分区

df -t ext4

相关

查看磁盘占用 du

reference

  • man df

2017-04-12 linux , df , disk , 磁盘空间 , command

Celery 使用介绍

Celery 简单来说就是一个分布式消息队列。简单、灵活且可靠,能够处理大量消息,它是一个专注于实时处理的任务队列,同时也支持异步任务调度。Celery 不仅可以单机运行,也能够同时在多台机器上运行,甚至可以跨数据中心。

Celery 中比较关键的概念:

  • worker: worker 是一个独立的进程,任务执行单元,它持续监视队列中是否有需要处理的任务;
  • broker: broker 消息传输中间件,任务调度队列,接收生产者发出的消息,将任务存入队列,broker 负责协调客户端和 worker 的沟通。客户端向队列添加消息,broker 负责把消息派发给 worker。
  • 任务模块:包含异步任务和定时任务,异步任务通常在业务逻辑中被触发并发往任务队列,定时任务由 celery beat 进程周期性发送
  • 任务结果 backend:backend 存储任务执行结果,同消息中间件一样,需要由其他存储系统提供支持

一个典型的 Celery 使用场景就是,当用户在网站注册时,请求可以立即返回而不用等待发送注册激活邮件之后返回,网站可以将发送邮件这样的耗时不影响主要流程的操作放到消息队列中,Celery 就提供了这样的便捷。

安装 Celery

直接使用 python 工具 pip 或者 easy_install 来安装:

$ pip install celery

安装 Broker

Celery 支持多种 broker, 但主要以 RabbitMQ 和 Redis 为主,其他都是试验性的,虽然也可以使用, 但是没有专门的维护者。如何在 RabbitMQ 和 Redis 之间选择呢?

RabbitMQ is feature-complete, stable, durable and easy to install. It’s an excellent choice for a production environment.

Redis is also feature-complete, but is more susceptible to data loss in the event of abrupt termination or power failures.

Celery 官方明确表示推荐在生产环境下使用 RabbitMQ,Redis 存在丢数据的问题。所以如果你的业务可以容忍 worker crash 或者电源故障导致的任务丢失,采用 redis 是个不错的选择,本篇就以 redis 为例来介绍。

Celery 对于 redis 的支持需要安装相关的依赖,以下命令可以同时安装 celery 和 redis 相关的依赖,但是 redis server 还是必须单独安装的。

$ pip install -U celery[redis]  # -U 的意思是把所有指定的包都升级到最新的版本

Celery 的配置和使用

Celery 本身的配置项是很多的,但是如果要让它跑起来,你只需要加一行配置:

BROKER_URL = 'redis://localhost:6379/0'

这一行就是告诉 celery broker 的地址和选择的 redis db,默认是 0。接下来用个很简单的例子来介绍 celery 是如何使用的:

# task.py
from celery import Celery

broker = 'redis://127.0.0.1:6379/0'

app = Celery('tasks', broker=broker)

@app.task()
def add(x, y):
   return x + y

上述代码创建了一个 celery 的实例 app,可以通过它来创建任务和管理 workers。在上面的例子中,我们创建了一个简单的任务 task,它返回了两个数相加后的结果。然后启动 celery 服务,通过它来监听是否有任务要处理。

$ celery worker -A task -l info
  • -A 选项指定 celery 实例 app 的位置,本例中 task.py 中自动寻找,当然可以直接指定 celery worker -A task.app -l info
  • -l 选项指定日志级别, -l--loglevel 的缩略形式

其他更多选项通过 celery worker –-help 查看

调用任务或者发送消息

然后我们再打开一个 shell 窗口,进入 python 控制台去调用 add 任务:

>>> from task import add
>>> add.delay(1, 2)
<AsyncResult: 42ade14e-c7ed-4b8d-894c-1ca1ec7ca192>

delay()apply_async 的简写,用于一个任务消息(task message)。我们发现 add 任务并没有返回结果 3,而是一个对象 AsyncResult,它的作用是被用来检查任务状态,等待任务执行完毕或获取任务结果,如果任务失败,它会返回异常信息或者调用栈。

我们先尝试获取任务的执行结果:

>>> result = add.delay(1, 2)
>>> result.get()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
    no_ack=no_ack,
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/base.py", line 604, in _is_disabled
    'No result backend configured.  '
NotImplementedError: No result backend configured.  Please see the documentation for more information.

报错了:No result backend configured. 错误信息告诉我们没有配置 result backend。因为 celery 会将任务的 状态或结果保存在 result backend,result backend 的选择也有很多,本例中依然选用 redis 作为 result backend。

我们修改 task.py 的代码,添加上 result backend 的设置,保存后重启 celery worker。

# task.py
...
app = Celery('tasks', backend='redis://localhost', broker='redis://localhost//')
...

然后重新调用 add task:

>>> from task import add
>>> result = add.delay(1,2)
>>> result.get()
3

Celery Flower

flower 是一个 celery 的监控工具,它提供了一个图形用户界面,可以极大的方便我们监控任务的执行过程, 执行细节及历史记录,还提供了统计功能。

flower 安装

pip install flower

or:

easy_install flower

flower 使用简介,首先启动通过命令行启动 flower 进程:

flower -A proj --port=5555

然后打开浏览器 http://localhost:5555/

celery flower

Celery 任务类型

apply_async

调用一个异步任务,这也是最常用的任务类型之一,delay 与它的作用相同,只是 delay 不支持 apply_async 中额外的参数。该方法有几个比较重要的参数,在实际应用中会经常用到:

countdown: 任务延迟执行的秒数,默认立即执行; eta:任务被执行的绝对时间

crontab 计划任务

Celery 同样也支持定时任务:

from datetime import timedelta
from celery.schedules import crontab

app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 A.M
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (20, 20),
    },
    # execute every ten minutes
   'every_ten_minutes': {
        'task': 'worker.cntv.cntv_test',
        'schedule': timedelta(minutes=10),
        'args': ('args1',),
        'options': {
            'queue': 'queue_name'
        }
    },
}

要启动定时任务,需要启动一个心跳进程,假设

celery beat -A celery_app.celery_config -s /path/to/celerybeat-schedule -l info

其中 -s 参数指定 celerybeat 文件保存的位置。beat 主要的功能就是将 task 下发到 broker 中,让 worker 去消费。

取消队列中任务

取消队列中任务,可以使用命令行,也可以导入 celery app 然后使用 control()

celery -A proj -Q queue_name purge      # 取消队列 queue_name 中的任务
# or
from proj.celery import app
app.control.purge()

From:stackoverflow

celery 在 supervisor 中 root 不能启动问题

Celery 不能用 root 用户启动,所以在 supervisor 中启动时会报错:

If you really want to continue then you have to set the C_FORCE_ROOT
environment variable (but please think about this before you do).

User information: uid=0 euid=0 gid=0 egid=0

解决办法

from celery import Celery, platforms
platforms.C_FORCE_ROOT = True

或者 supervisor 配置中

environment=C_FORCE_ROOT="true"

reference


2017-04-10 celery , python , queue , task , distribution

电子书

最近文章

  • Vim 插件之全局搜索:ack.vim 这篇文章看开始陆陆续续记录一下用过的 Vim Plugin,虽然有些一直也在用但从没有好好整理过,正好这篇开一个计划吧。
  • 我可能要抛弃用了很多年的 Chrome 换用 Vivaldi 大概一两年前就听说了 Vivaldi 这样一款浏览器,它使用 Chromium 做内核,创始人是从 Opera 浏览器出来的,不满 Chrome 横扫浏览器市场,没有给 Opera 一点喘息的机会,很多 Opera 的特性在 Chrome 上都被摘掉了。所以他们就开始搞了这一个 Vivaldi 浏览器,我记得当时听到的第一个宣传语是,快,想来 Chrome 最初打开市场时的宣传语也是快,但是多少年过去了 Chrome 虽然自身保持非常干净,但是加上自定义的扩展,也变得越来越臃肿了,但倒是也在一个可以接受的范围内。两年前 Vivaldi 依靠这一条宣传没有吸引到我,但是我也下载尝试,并且这两年来一直存在我的电脑中。我记得之前在我看来最大的缺点便是不能同步数据,而这个功能也在之前的更新中被加上了。所以今天就非常值得拿出来讲一讲。
  • 使用 port knocking 隐藏 SSH daemon 端口 暴露在互联网上的服务器非常容易被恶意程序进行端口扫描,以前也整理过一篇 VPS 安全设置 的文章,但都是一些比较基础的设置,能够绕过一些简单的端口扫描,但是并不能从根本上解决端口扫描的问题。
  • 威联通折腾篇十四:迁移系统盘 当时安装系统的时候就直接插入了一块硬盘,安装在了第一块机械硬盘上面,虽然读写也没有遇到什么瓶颈,但是记录以做备份,可以用于将系统迁移到 SSD 上。
  • Intellij IDEA 支持 jQuery 在 设置中进行如下操作开启 jQuery 支持: