layout: post title: “snakemake使用笔记” date: 2021-10- 26 description: “snakemake是一个基于python3的任务流程工具,执行流程脚本简单易懂,支持断点运行、并行运算、内存控制、cpu核心控制、流程控制等特点”
snakemake命令与规则
rule
rule
是Snakefile中最主要的部分。每一个rule定义了pipeline流程中运行的一步,每一个rule都可以当作一个shell脚本来处理,一般主要包括 input
、output
、shell
3个部分。
-
rule all
不同于其他的rule,在
rule all
里面一般不会去定义要执行的命令,他一般用来定义最后的输出结果文件,rule all中的input是流程的最终目标文件。这个rule all 必须要有,不然整个pipeline不会运行。rule all: input: #"merged.txt" 取消注释后,则能正常输出文件 rule concat: input: expand("{file}.txt", file=["hello", "world"]) output: "merge.txt" shell: "cat {input} > {output}"
-
wildcards
用来获取通配符匹配到的部分,例如对于通配符
"{dataset}/file.{group}.txt"
匹配到文件101/file.A.txt
,则{wildcards.dataset}
就是101,{wildcards.group}
就是A。 -
threads
通过在rule里面指定
threads
参数来指定分配给程序的线程数,egthreads: 8
。 -
resources
可用来指定程序运行的内存,eg.
resources: mem_mb=800
。 -
message
使用
message
参数可以指定每运行到一个rule时,在终端中给出提示信息,eg.message: "starting mapping ..."
。 -
priority
可用来指定程序运行的优先级,默认为0,eg.
priority: 20
。 -
log
用来指定生成的日志文件,eg.
log: "logs/concat.log"
。 -
params
指定程序运行的参数,eg.
params: cat="-n"
,调用方法为{params.cat}
。 -
run
在
run
的缩进区域里面可以输入并执行python代码。 -
scripts
用来执行指定脚本。script: 'scripts/script.py'; 'script/script.R'
-
temp
通过
temp
方法可以在所有rule
运行完后删除指定的中间文件,如果下一步成功运行,那么就删掉这一步的输出文件,这样去掉没有必要保留的中间文件,就会为我们省掉一定的磁盘空间,大规模分析时候非常方便。fw = temp(WORKING_DIR + "fastq/{sample}_1.fastq"), rev= temp(WORKING_DIR + "fastq/{sample}_2.fastq")
-
protected
用来指定某些中间文件是需要保留的,eg.
output: protected("f1.bam")
。这里protected的意思就是输出文件很重要,我们想把访问权限改成只读,以防止误删和修改。bams = protected(WORKING_DIR + "mApped/{sample}.bam")
-
ancient
重复运行执行某个Snakefile时,snakemake会通过比较输入文件的时间戳是否更改(比原来的新)来决定是否重新执行程序生成文件,使用ancient方法可以强制使得结果文件
-
Rule Dependencies
可通过快捷方式指定前一个rule的输出文件为此rule的输入文件
rule a: input: "path/to/input" output: "path/to/output" shell: ... rule b: input: rules.a.output #直接通过rules.a.output 指定rule a的输出 output: "path/to/output/of/b" shell: ...
-
report
使用snakemake定义的
report
函数可以方便的将结果嵌入到一个HTML文件中进行查看。rule report: input: "calls/all.vcf" output: "report.html" run: from snakemake.utils import report with open(input[0]) as vcf: n_calls = sum(1 for l in vcf if not l.startswith("#")) report(""" An example variant calling workflow =================================== Reads were mapped to the Yeast reference genome and variants were called jointly with SAMtools/BCFtools. This resulted in {n_calls} variants (see Table T1_). """, output[0], T1=input[0])
-
expand
expand是一个特殊函数,它的目的是用通配符,将所有的SRA ID 传入。
qcfile = expand(RESULT_DIR + "fastp/{sample}.html",sample=SAMPLES)
-
自动创建文件夹
在写Snakemake的主体代码部分时候我们只给出了文件夹链接,并不需要自己创建这些文件
配置文件
每计算一次数据都要重写一次Snakefile有时可能会显得有些繁琐,我们可以将那些改动写入配置文件,使用相同流程计算时,将输入文件的文件名写入配置文件然后通过Snakefile读入即可。配置文件有两种书写格式——json和yaml。在Snakefile中读入配置文件使用如下方式:
configfile: "path/to/config.json"
configfile: "path/to/config.yaml"
# 也可直接在执行snakemake命令时指定配置
$ snakemake --config yourparam=1.5
# config.yaml内容为:
samples:
A: data/samples/A.fastq
B: data/samples/B.fastq
# snakemake调用配置文件参数
configfile: "config.yaml"
...
rule bcftools_call:
input:
fa="data/genome.fa",
bam=expand("sorted_reads/{sample}.bam", sample=config["samples"]),
bai=expand("sorted_reads/{sample}.bam.bai", sample=config["smaples"])
output:
"calls/all.vcf"
shell:
"samtools mpileup -g -f {input.fa} {input.bam} | "
"bcftools call -mv - > {output}"
# 虽然samples是一个字典,但是展开的时候,只会使用他们的key值部分。
rule bwa_map:
input:
"data/genome.fa",
lambda wildcards: config["samples"][wildcards.sample]
output:
"mapped_reads/{sample}.bam"
threads: 8
shell:
"bwa mem -t {threads} {input} | samtools view -Sb - > {output}"
snakemake执行
一般讲所有的参数配置写入Snakefile后直接在Snakefile所在路径执行snakemake
命令即可开始执行流程任务。一些常用的参数:
--snakefile, -s 指定Snakefile,否则是当前目录下的Snakefile
--dryrun, -n 不真正执行,一般用来查看Snakefile是否有错
--printshellcmds, -p 输出要执行的shell命令
--reason, -r 输出每条rule执行的原因,默认FALSE
--cores, --jobs, -j 指定运行的核数,若不指定,则使用最大的核数
--force, -f 重新运行第一条rule或指定的rule
--forceall, -F 重新运行所有的rule,不管是否已经有输出结果
--forcerun, -R 重新执行Snakefile,当更新了rule时候使用此命令
#一些可视化命令
$ snakemake --dag | dot -Tpdf > dag.pdf
#集群投递
snakemake --cluster "qsub -V -cwd -q 节点队列" -j 10
# --cluster /-c CMD: 集群运行指令
# qusb -V -cwd -q, 表示输出当前环境变量(-V),在当前目录下运行(-cwd), 投递到指定的队列(-q), 如果不指定则使用任何可用队列
# --local-cores N: 在每个集群中最多并行N核
# --cluster-config/-u FILE: 集群配置文件
snakemake运行逻辑
当Snakemake收到rule_all
定义的需要输出文件后, 首先看这些输出文件是否存在,如果不存在是哪个rule
产生了这些output
,然后去运行相应的rule
。
例如,我们在rule all
里写了需要输出bam文件,那么Snakemake
会先去看是rule hisat_mApping
产生了BAM,所以要先去运行rule hisat_mApping
。在运行rule hisat_mApping
的时候,它会发现rule hisat_mApping
需要另外的input trimed fastq
。
所以下一步是顺藤摸瓜,去看trimed fastq是哪个rule产生的,如此循环往复,直到回溯到第一步的SRA ID文件。运行的逻辑就像剥洋葱一样倒着把pipeline串起来。 如果是因为某一个原因pipeline中断了,假如运行到第二步 trimed fastq已经产生了,但是断电了。下次重新启动的时候,因为Snakemake是回溯后针对性的来运行,所以回溯到第二步trimed fastq,发现了这个文件,就不会在往前回溯了,保证了从断点处继续跑。
它执行分为三个阶段:
-
在初始化阶段,工作流程会被解析,所有规则都会被实例化
-
在DAG阶段,也就是生成有向无环图,确定依赖关系的时候,所有的通配名部分都会被真正的文件名代替。
-
在调度阶段,DAG的任务按照顺序执行。
也就是说在初始化阶段,我们是无法获知通配符所指代的具体文件名,必须要等到第二阶段,才会有wildcards变量出现。也就是说之前的出错的原因都是因为第一个阶段没通过。这个时候就需要输入函数推迟文件名的确定,可以用Python的匿名函数,也可以是普通的函数