本文目录一览:
- 1、python 多线程logger问题
- 2、python 运维常用脚本
- 3、单细胞转录组双细胞判别软件scDblFinder
- 4、PYTHON!!!!!!!!!!!!!!!!!!!!!!!
- 5、关于python的几个小编程 急!
python 多线程logger问题
因为logging是threadsafe的,但不是process-safe(应该没有这个词儿,只是为了便于理解)的。这段代码就是多个进程共同操作一个日志文件。这种情况下,logging的行为就很难说了。
我测试了一下,日志中大概几百行。而且,可以看到一些顺序错乱现象:
Fri, 08 Aug 2014 01:19:38 logging_in_multithread.py[line:40] theadWorking ERROR 2
FFri, 08 Aug 2014 01:19:36 logging_in_multithread.py[line:40] theadWorking ERROR 11(注意这里的FFri)
把代码这样改:
for num in range(processNum):
p = Process(target=processWorking, args=('2',))
processs.append(p)
p.start()
p.join()
还有其他方法,比如:为logging实现一个FileHandler,以使logging在multiple process的环境下也能正常工作。这是我从网上了解到的做法,自己还没实践过。
Python Manual中logging Cookbook中有这么一段话:
Logging to a single file from multiple processes
Although logging is thread-safe, and logging to a single file from multiple threads in a single process is supported, logging to a single file from multiple processes is not supported, because there is no standard way to serialize access to a single file across multiple processes in Python. If you need to log to a single file from multiple processes, one way of doing this is to have all the processes log to a SocketHandler, and have a separate process which implements a socket server which reads from the socket and logs to file. (If you prefer, you can dedicate one thread in one of the existing processes to perform this function.)
这段话中也提出了另外一种解决方案。
python 运维常用脚本
Python 批量遍历目录文件,并修改访问时间
import os
path = "D:/UASM64/include/"
dirs = os.listdir(path)
temp=[];
for file in dirs:
temp.append(os.path.join(path, file))
for x in temp:
os.utime(x, (1577808000, 1577808000))
Python 实现的自动化服务器管理
import sys
import os
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def ssh_cmd(user,passwd,port,userfile,cmd):
def ssh_put(user,passwd,source,target):
while True:
try:
shell=str(input("[Shell] # "))
if (shell == ""):
continue
elif (shell == "exit"):
exit()
elif (shell == "put"):
ssh_put("root","123123","./a.py","/root/a.py")
elif (shell =="cron"):
temp=input("输入一个计划任务: ")
temp1="(crontab -l; echo "+ temp + ") |crontab"
ssh_cmd("root","123123","22","./user_ip.conf",temp1)
elif (shell == "uncron"):
temp=input("输入要删除的计划任务: ")
temp1="crontab -l | grep -v " "+ temp + "|crontab"
ssh_cmd("root","123123","22","./user_ip.conf",temp1)
else:
ssh_cmd("lyshark","123123","22","./user_ip.conf",shell)
遍历目录和文件
import os
def list_all_files(rootdir):
import os
_files = []
list = os.listdir(rootdir) #列出文件夹下所有的目录与文件
for i in range(0,len(list)):
path = os.path.join(rootdir,list[i])
if os.path.isdir(path):
_files.extend(list_all_files(path))
if os.path.isfile(path):
_files.append(path)
return _files
a=list_all_files("C:/Users/LyShark/Desktop/a")
print(a)
python检测指定端口状态
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.settimeout(1)
for ip in range(0,254):
try:
sk.connect(("192.168.1."+str(ip),443))
print("192.168.1.%d server open \n"%ip)
except Exception:
print("192.168.1.%d server not open"%ip)
sk.close()
python实现批量执行CMD命令
import sys
import os
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
print("------------------------------\n")
print("使用说明,在当前目录创建ip.txt写入ip地址")
print("------------------------------\n")
user=input("输入用户名:")
passwd=input("输入密码:")
port=input("输入端口:")
cmd=input("输入执行的命令:")
file = open("./ip.txt", "r")
line = file.readlines()
for i in range(len(line)):
print("对IP: %s 执行"%line[i].strip('\n'))
python3-实现钉钉报警
import requests
import sys
import json
dingding_url = ' '
data = {"msgtype": "markdown","markdown": {"title": "监控","text": "apche异常"}}
headers = {'Content-Type':'application/json;charset=UTF-8'}
send_data = json.dumps(data).encode('utf-8')
requests.post(url=dingding_url,data=send_data,headers=headers)
import psutil
import requests
import time
import os
import json
monitor_name = set(['httpd','cobblerd']) # 用户指定监控的服务进程名称
proc_dict = {}
proc_name = set() # 系统检测的进程名称
monitor_map = {
'httpd': 'systemctl restart httpd',
'cobblerd': 'systemctl restart cobblerd' # 系统在进程down掉后,自动重启
}
dingding_url = ' '
while True:
for proc in psutil.process_iter(attrs=['pid','name']):
proc_dict[proc.info['pid']] = proc.info['name']
proc_name.add(proc.info['name'])
判断指定端口是否开放
import socket
port_number = [135,443,80]
for index in port_number:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('127.0.0.1', index))
if result == 0:
print("Port %d is open" % index)
else:
print("Port %d is not open" % index)
sock.close()
判断指定端口并且实现钉钉轮询报警
import requests
import sys
import json
import socket
import time
def dingding(title,text):
dingding_url = ' '
data = {"msgtype": "markdown","markdown": {"title": title,"text": text}}
headers = {'Content-Type':'application/json;charset=UTF-8'}
send_data = json.dumps(data).encode('utf-8')
requests.post(url=dingding_url,data=send_data,headers=headers)
def net_scan():
port_number = [80,135,443]
for index in port_number:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('127.0.0.1', index))
if result == 0:
print("Port %d is open" % index)
else:
return index
sock.close()
while True:
dingding("Warning",net_scan())
time.sleep(60)
python-实现SSH批量CMD执行命令
import sys
import os
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def ssh_cmd(user,passwd,port,userfile,cmd):
file = open(userfile, "r")
line = file.readlines()
for i in range(len(line)):
print("对IP: %s 执行"%line[i].strip('\n'))
ssh.connect(hostname=line[i].strip('\n'),port=port,username=user,password=passwd)
cmd=cmd
stdin, stdout, stderr = ssh.exec_command(cmd)
result = stdout.read()
ssh_cmd("lyshark","123","22","./ip.txt","free -h |grep 'Mem:' |awk '{print $3}'")
用python写一个列举当前目录以及所有子目录下的文件,并打印出绝对路径
import sys
import os
for root,dirs,files in os.walk("C://"):
for name in files:
print(os.path.join(root,name))
os.walk()
按照这样的日期格式(xxxx-xx-xx)每日生成一个文件,例如今天生成的文件为2013-09-23.log, 并且把磁盘的使用情况写到到这个文件中。
import os
import sys
import time
new_time = time.strftime("%Y-%m-%d")
disk_status = os.popen("df -h").readlines()
str1 = ''.join(disk_status)
f = open(new_time+'.log','w')
f.write("%s"%str1)
f.flush()
f.close()
统计出每个IP的访问量有多少?(从日志文件中查找)
import sys
list = []
f = open("/var/log/httpd/access_log","r")
str1 = f.readlines()
f.close()
for i in str1:
ip=i.split()[0]
list.append(ip)
list_num=set(list)
for j in list_num:
num=list.count(j)
print("%s ----- %s" %(num,j))
写个程序,接受用户输入数字,并进行校验,非数字给出错误提示,然后重新等待用户输入。
import tab
import sys
while True:
try:
num=int(input("输入数字:").strip())
for x in range(2,num+1):
for y in range(2,x):
if x % y == 0:
break
else:
print(x)
except ValueError:
print("您输入的不是数字")
except KeyboardInterrupt:
sys.exit("\n")
ps 可以查看进程的内存占用大小,写一个脚本计算一下所有进程所占用内存大小的和。
import sys
import os
list=[]
sum=0
str1=os.popen("ps aux","r").readlines()
for i in str1:
str2=i.split()
new_rss=str2[5]
list.append(new_rss)
for i in list[1:-1]:
num=int(i)
sum=sum+num
print("%s --- %s"%(list[0],sum))
关于Python 命令行参数argv
import sys
if len(sys.argv) 2:
print ("没有输入任何参数")
sys.exit()
if sys.argv[1].startswith("-"):
option = sys.argv[1][1:]
利用random生成6位数字加字母随机验证码
import sys
import random
rand=[]
for x in range(6):
y=random.randrange(0,5)
if y == 2 or y == 4:
num=random.randrange(0,9)
rand.append(str(num))
else:
temp=random.randrange(65,91)
c=chr(temp)
rand.append(c)
result="".join(rand)
print(result)
自动化-使用pexpect非交互登陆系统
import pexpect
import sys
ssh = pexpect.spawn('ssh lyshark@59.110.167.239')
fout = file('sshlog.txt', 'w')
ssh.logfile = fout
ssh.expect("lyshark@59.110.167.239's password:")
ssh.sendline("密码")
ssh.expect('#')
ssh.sendline('ls /home')
ssh.expect('#')
Python-取系统时间
import sys
import time
time_str = time.strftime("日期:%Y-%m-%d",time.localtime())
print(time_str)
time_str= time.strftime("时间:%H:%M",time.localtime())
print(time_str)
psutil-获取内存使用情况
import sys
import os
import psutil
memory_convent = 1024 * 1024
mem =psutil.virtual_memory()
print("内存容量为:"+str(mem.total/(memory_convent))+"MB\n")
print("已使用内存:"+str(mem.used/(memory_convent))+"MB\n")
print("可用内存:"+str(mem.total/(memory_convent)-mem.used/(1024*1024))+"MB\n")
print("buffer容量:"+str(mem.buffers/( memory_convent ))+"MB\n")
print("cache容量:"+str(mem.cached/(memory_convent))+"MB\n")
Python-通过SNMP协议监控CPU
注意:被监控的机器上需要支持snmp协议 yum install -y net-snmp*
import os
def getAllitems(host, oid):
sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid + '|grep Raw|grep Cpu|grep -v Kernel').read().split('\n')[:-1]
return sn1
def getDate(host):
items = getAllitems(host, '.1.3.6.1.4.1.2021.11')
if name == ' main ':
Python-通过SNMP协议监控系统负载
注意:被监控的机器上需要支持snmp协议 yum install -y net-snmp*
import os
import sys
def getAllitems(host, oid):
sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid).read().split('\n')
return sn1
def getload(host,loid):
load_oids = '1.3.6.1.4.1.2021.10.1.3.' + str(loid)
return getAllitems(host,load_oids)[0].split(':')[3]
if name == ' main ':
Python-通过SNMP协议监控内存
注意:被监控的机器上需要支持snmp协议 yum install -y net-snmp*
import os
def getAllitems(host, oid):
def getSwapTotal(host):
def getSwapUsed(host):
def getMemTotal(host):
def getMemUsed(host):
if name == ' main ':
Python-通过SNMP协议监控磁盘
注意:被监控的机器上需要支持snmp协议 yum install -y net-snmp*
import re
import os
def getAllitems(host,oid):
def getDate(source,newitem):
def getRealDate(item1,item2,listname):
def caculateDiskUsedRate(host):
if name == ' main ':
Python-通过SNMP协议监控网卡流量
注意:被监控的机器上需要支持snmp协议 yum install -y net-snmp*
import re
import os
def getAllitems(host,oid):
sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid).read().split('\n')[:-1]
return sn1
def getDevices(host):
device_mib = getAllitems(host,'RFC1213-MIB::ifDescr')
device_list = []
def getDate(host,oid):
date_mib = getAllitems(host,oid)[1:]
date = []
if name == ' main ':
Python-实现多级菜单
import os
import sys
ps="[None]-"
ip=["192.168.1.1","192.168.1.2","192.168.1.3"]
flage=1
while True:
ps="[None]-"
temp=input(ps)
if (temp=="test"):
print("test page !!!!")
elif(temp=="user"):
while (flage == 1):
ps="[User]-"
temp1=input(ps)
if(temp1 =="exit"):
flage=0
break
elif(temp1=="show"):
for i in range(len(ip)):
print(i)
Python实现一个没用的东西
import sys
ps="[root@localhost]# "
ip=["192.168.1.1","192.168.1.2","192.168.1.3"]
while True:
temp=input(ps)
temp1=temp.split()
检查各个进程读写的磁盘IO
import sys
import os
import time
import signal
import re
class DiskIO:
def init (self, pname=None, pid=None, reads=0, writes=0):
self.pname = pname
self.pid = pid
self.reads = 0
self.writes = 0
def main():
argc = len(sys.argv)
if argc != 1:
print ("usage: please run this script like [./lyshark.py]")
sys.exit(0)
if os.getuid() != 0:
print ("Error: This script must be run as root")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
os.system('echo 1 /proc/sys/vm/block_dump')
print ("TASK PID READ WRITE")
while True:
os.system('dmesg -c /tmp/diskio.log')
l = []
f = open('/tmp/diskio.log', 'r')
line = f.readline()
while line:
m = re.match(
'^(\S+)(\d+)(\d+): (READ|WRITE) block (\d+) on (\S+)', line)
if m != None:
if not l:
l.append(DiskIO(m.group(1), m.group(2)))
line = f.readline()
continue
found = False
for item in l:
if item.pid == m.group(2):
found = True
if m.group(3) == "READ":
item.reads = item.reads + 1
elif m.group(3) == "WRITE":
item.writes = item.writes + 1
if not found:
l.append(DiskIO(m.group(1), m.group(2)))
line = f.readline()
time.sleep(1)
for item in l:
print ("%-10s %10s %10d %10d" %
(item.pname, item.pid, item.reads, item.writes))
def signal_handler(signal, frame):
os.system('echo 0 /proc/sys/vm/block_dump')
sys.exit(0)
if name ==" main ":
main()
利用Pexpect实现自动非交互登陆linux
import pexpect
import sys
ssh = pexpect.spawn('ssh root@59.110.167.239')
fout = file('sshlog.log', 'w')
ssh.logfile = fout
ssh.expect("root@59.110.167.239's password:")
ssh.sendline("密码")
ssh.expect('#')
ssh.sendline('ls /home')
ssh.expect('#')
利用psutil模块获取系统的各种统计信息
import sys
import psutil
import time
import os
time_str = time.strftime( "%Y-%m-%d", time.localtime( ) )
file_name = "./" + time_str + ".log"
if os.path.exists ( file_name ) == False :
os.mknod( file_name )
handle = open ( file_name , "w" )
else :
handle = open ( file_name , "a" )
if len( sys.argv ) == 1 :
print_type = 1
else :
print_type = 2
def isset ( list_arr , name ) :
if name in list_arr :
return True
else :
return False
print_str = "";
if ( print_type == 1 ) or isset( sys.argv,"mem" ) :
memory_convent = 1024 * 1024
mem = psutil.virtual_memory()
print_str += " 内存状态如下:\n"
print_str = print_str + " 系统的内存容量为: "+str( mem.total/( memory_convent ) ) + " MB\n"
print_str = print_str + " 系统的内存以使用容量为: "+str( mem.used/( memory_convent ) ) + " MB\n"
print_str = print_str + " 系统可用的内存容量为: "+str( mem.total/( memory_convent ) - mem.used/( 1024*1024 )) + "MB\n"
print_str = print_str + " 内存的buffer容量为: "+str( mem.buffers/( memory_convent ) ) + " MB\n"
print_str = print_str + " 内存的cache容量为:" +str( mem.cached/( memory_convent ) ) + " MB\n"
if ( print_type == 1 ) or isset( sys.argv,"cpu" ) :
print_str += " CPU状态如下:\n"
cpu_status = psutil.cpu_times()
print_str = print_str + " user = " + str( cpu_status.user ) + "\n"
print_str = print_str + " nice = " + str( cpu_status.nice ) + "\n"
print_str = print_str + " system = " + str( cpu_status.system ) + "\n"
print_str = print_str + " idle = " + str ( cpu_status.idle ) + "\n"
print_str = print_str + " iowait = " + str ( cpu_status.iowait ) + "\n"
print_str = print_str + " irq = " + str( cpu_status.irq ) + "\n"
print_str = print_str + " softirq = " + str ( cpu_status.softirq ) + "\n"
print_str = print_str + " steal = " + str ( cpu_status.steal ) + "\n"
print_str = print_str + " guest = " + str ( cpu_status.guest ) + "\n"
if ( print_type == 1 ) or isset ( sys.argv,"disk" ) :
print_str += " 硬盘信息如下:\n"
disk_status = psutil.disk_partitions()
for item in disk_status :
print_str = print_str + " "+ str( item ) + "\n"
if ( print_type == 1 ) or isset ( sys.argv,"user" ) :
print_str += " 登录用户信息如下:\n "
user_status = psutil.users()
for item in user_status :
print_str = print_str + " "+ str( item ) + "\n"
print_str += "---------------------------------------------------------------\n"
print ( print_str )
handle.write( print_str )
handle.close()
import psutil
mem = psutil.virtual_memory()
print mem.total,mem.used,mem
print psutil.swap_memory() # 输出获取SWAP分区信息
cpu = psutil.cpu_stats()
printcpu.interrupts,cpu.ctx_switches
psutil.cpu_times(percpu=True) # 输出每个核心的详细CPU信息
psutil.cpu_times().user # 获取CPU的单项数据 [用户态CPU的数据]
psutil.cpu_count() # 获取CPU逻辑核心数,默认logical=True
psutil.cpu_count(logical=False) # 获取CPU物理核心数
psutil.disk_partitions() # 列出全部的分区信息
psutil.disk_usage('/') # 显示出指定的挂载点情况【字节为单位】
psutil.disk_io_counters() # 磁盘总的IO个数
psutil.disk_io_counters(perdisk=True) # 获取单个分区IO个数
psutil.net_io_counter() 获取网络总的IO,默认参数pernic=False
psutil.net_io_counter(pernic=Ture)获取网络各个网卡的IO
psutil.pids() # 列出所有进程的pid号
p = psutil.Process(2047)
p.name() 列出进程名称
p.exe() 列出进程bin路径
p.cwd() 列出进程工作目录的绝对路径
p.status()进程当前状态[sleep等状态]
p.create_time() 进程创建的时间 [时间戳格式]
p.uids()
p.gids()
p.cputimes() 【进程的CPU时间,包括用户态、内核态】
p.cpu_affinity() # 显示CPU亲缘关系
p.memory_percent() 进程内存利用率
p.meminfo() 进程的RSS、VMS信息
p.io_counters() 进程IO信息,包括读写IO数及字节数
p.connections() 返回打开进程socket的namedutples列表
p.num_threads() 进程打开的线程数
import psutil
from subprocess import PIPE
p =psutil.Popen(["/usr/bin/python" ,"-c","print 'helloworld'"],stdout=PIPE)
p.name()
p.username()
p.communicate()
p.cpu_times()
psutil.users() # 显示当前登录的用户,和Linux的who命令差不多
psutil.boot_time() 结果是个UNIX时间戳,下面我们来转换它为标准时间格式,如下:
datetime.datetime.fromtimestamp(psutil.boot_time()) # 得出的结果不是str格式,继续进行转换 datetime.datetime.fromtimestamp(psutil.boot_time()).strftime('%Y-%m-%d%H:%M:%S')
Python生成一个随机密码
import random, string
def GenPassword(length):
if name == ' main ':
print (GenPassword(6))
单细胞转录组双细胞判别软件scDblFinder
起因: 最近有个问题样本,跑完cellranger,样本的cellranger结果如下,细胞数目极高(3W+)。在后续数据质控分析中,线粒体基因占比和双细胞率均很高,用scDblFinder进行双细胞预测,双细胞占率竟然高达34%。我很好奇,双细胞率为什么会这么高,如何审视这个结果?决定看看scDblFinder的细节。
问题1:如何理解Doublets?
在scRNA-seq的细胞捕获步骤中,两个或多个细胞聚集成单个液滴(双联体/多联体)会导致混合的转录组,也就是两个或多个细胞共用一个barcode,称为doublets或multiplets(后面统称为doublets)。它是基于液滴的单细胞测序的技术副产品。双细胞会造成每个“细胞”的高UMI计数,改变cluster的细胞类型鉴定干扰到下游分析。这会导致对稀有细胞类型、中间细胞状态和疾病相关转录组学特征的人为错误发现。双细胞率已被证明与捕获的细胞数量成正比(Bloom 2018; Kang et al. 2018)。
双细胞可以分为同型(相同细胞类型)或异型(不同细胞类型)。
问题2: Cell Ranger可以自动剔除doublets和multiplets吗?
答:目前没有方法可以识别与双细胞中单个细胞相关的转录本信息。
10X官网对双细胞率的 相关答复 ,我们目前没有一种方法通过算法识别单个物种的单细胞基因表达数据,barcode是否包含多个细胞。
目前,Cell Ranger 软件仅在barnyard实验或多物种实验中估计双细胞率。
对此,10X也给出三条参考意见:
他的意思是:1) 通过已知细胞类型的marker基因来鉴别双细胞,比如T/B细胞的marker基因,在同一个barcode细胞中同时高表达就可判定为双细胞;2) seurat标准分析流程,质控环节通过UMI和gene指标过滤;3)运用scDblFinder双细胞预测软件。
问题3:10X Genomics 单细胞实验中估计的双细胞率是多少?
假设不存在细胞结团,可使用下表(取自 10X基因组学用户指南 )来估计单细胞实验中估计的双细胞率。
双细胞率(0.8%/1000cells),如果细胞数为1W,双细胞率为7.6%,约8%。
单细胞实验双细胞率为10-20%,这个数值明显高于上面10X给出的双细胞率(~8%)。这个怎么理解呢?
我想到的几个原因:
1)10X给出的是理想情况的双细胞率(细胞不结团),用标准样本做基准比较;
2)双细胞率跟实验环节中的样本处理和细胞上样量都有关系;
3)还取决如何计算双细胞,双细胞率=双细胞数目/总细胞数;因计算的细胞总体不同而不同;
我们一般会进行QC细胞质控,用UMI/genes指标过滤掉低质量细胞和异常值细胞;
如果QC细胞质控后计算双细胞率,和QC细胞质控前计算双细胞率,预测的双细胞率会不一致;
最近,在网上找到一个10x Genomics 提供的估计双细胞率是:
比如1W个细胞,双细胞率为:0.008*(10000/1000)=0.08=8%。
下面我们看看scDblFinder软件是如何具体执行的。
双细胞在单细胞测序数据中很普遍,可能会导致人为错误的发现。
目前,实验层面还是无法检测同一样本的细胞形成的双细胞,包括异型双细胞。
算法层,已经开发了许多计算方法来根据转录谱识别双细胞。大多数这些方法依赖于通过对真实细胞求和或求平均来生成人造双细胞,并对它们与真实细胞之间的相似性进行评分。 例如,DoubletFinder在真实细胞和人工双细胞的合集上生成k最近邻近图(kNN) ,并估计每个细胞附近人造双细胞的密度(McGinnis, Murrow, and Gartner 2019)。 以类似的方式,Bais和Kostka (2020) 提出的bcds算法和共表达评分cxds算法。
Xi 和 Li (2021a) 最近发表的文章中对双细胞检测方法进行基准测试,使用模拟数据和包含双细胞实验标记的真实数据数据集,发现DoubletFinder的算法最为准确。 但是,基准测试也发现,没有一种方法在所有数据集上都是系统性最优,强调在各种数据集上测试和基准测试方法的必要性,并表明某些算法在不同情况下可能具有优势和劣势。
没有一种算法是完美不缺的,特别是预测模型算法。
下图比较了scDblFinder 这个包中一些方法(以粗体显示)与其他方法:
因此,我们在单细胞转录组数据质控过滤时,会考虑到双细胞的因素,通过相关软件进行预测双细胞。常用的软件有 scDblFinder (R语言)和 Scrublet (python)。这里仅讨论scDblFinder。
安装scDblFinder需要满足R = 4.0 和 Bioconductor = 3.12
scDblFinder的输入数据是 SingleCellExperiment 对象(空的drops已经移出),至少要包含counts矩阵(assay ‘counts’)。即sce对象都不应该包含空滴,但不应该经过非常严格的过滤(这会影响双细胞率的估计)。
如果还包含归一化矩阵 (assay ‘logcounts’) 和PCA (reducedDim ‘PCA’),可以使用scDblFinder的cluster模式(不常见)。
对于 10x 数据,通常将dbr留空是安全的,它会自动估计。 scDblFinder的输出会在sce的colData中添加一些以‘scDblFinder’为前缀的列,其中最重要的是:
如果你有多个样本(理解为不同的细胞捕获),那么最好为每个样本分别进行双细胞识别(对于cell hashes实验中的多重样本,那意味着每个批次)。 可以通过简单地向scDblFinder的samples参数提供样本 id来完成,或者,将样本信息存储在colData列中,提供列名即可。另外,还可以考虑使用BPPARAM参数对其进行多线程处理(假设有足够的RAM)。 例如:
我们用之前案例中的数据测试下scDblFinder函数。
单个样本
该样本共10194个细胞,其中968个细胞被预测为双细胞,双细胞率为9.5%
那么该如何审视这个结果,选择怎样的参数?
我想到的是,对于一个预测模型来说,调参的意义不大,我们对结果没有预判,修改参数,都会出现不同的预测值。另外,对于一个常规的10X单细胞转录组数据,我们对双细胞率是有一定预判的,10X的实验步骤大致固定,cellranger的细胞数大致1W,双细胞率大致10%,我们知道预测的边界。我们有粗略的“标尺”。
但是现在,cellranger给出的细胞数是3W+,我们其实是不清楚双细胞率的边界,细胞数“超纲”了,只知道细胞数越多,双细胞也会越多。这类样本太少,我们没有横向可参考的实例。如果出现这种结果,最应该审视的是实验端出了什么问题,线粒体基因占比也非常高。
scDblFinder有两种生成人造双细胞的主要模式: 随机模式 (scDblFinder.random,clusters=FALSE, 默认方式)和 基于cluster的模式 (scDblFinder.clusters,clusters=TRUE 或提供你自定义的cluster - 以前版本的方法)。在实际中,我们观察到两种方法都表现良好(比其他方法要好)。当数据集被分成清晰的cluster时,教程建议使用基于cluster的方法(例如发展轨迹),否则使用随机模式。
双细胞分为 同型双细胞 ('Homotypic' doublets)和 异性双细胞 ( 'Heterotypic' doublets)。同型双细胞由相同类型的细胞(即相似的转录状态)组成,仅根据它们的转录组信息很难辨识。而且,它们对于大多数分析来说也相对无害,因为它们看起来与单细胞高度相似。 相反,异型双细胞(由具有不同转录状态的细胞形成)表现为一种人为的新型细胞类型,会影响下游分析。
scDblFinder只关注异性双细胞。
step1: 将数据集缩减到仅高表达的基因(默认为 1000); 如果使用基于cluster的方法,则会选择每个cluster的表达靠前的基因。另外使用基于cluster的方法(而不是人为指定cluster),将会执行快速聚类(请参阅fastcluster)。
step2: 通过组合不同cluster的细胞来创建人工双细胞,创建的人工双细胞数与cluster的数目成比例。 我们主要关注不同cluster间的双细胞,我们不会试图识别同型双细胞,无论如何,它们实际上无法识别且对下游分析相对无害。因此,我们减少了人工双细胞的必要数量, 也防止分类器被训练以识别与单细胞无法区分的细胞(因此将单细胞称为双细胞)。 scDblFinder另一种策略是生成完全随机的人工双细胞,并使用迭代程序从训练中排除无法识别的人工双细胞。 在实践中,这两种方法具有相当的性能,它们也可以结合使用。
step3: 然后对真实细胞和人工双细胞的合集进行降维,并生成最近邻网络。 接着使用网络来估计每个细胞的许多特征,特别是最近邻居中人工双细胞的比例。 该比率不是选择特定的邻域大小,该比率是在不同的 k 值下计算的,通过使用多个预测变量创建分类器。预测变量还包括距离加权比,进一步添加了的细胞层面上的预测变量:对主成分的预测;文库大小; 和共表达分数(基于Bais 和 Kostka2020 的变体)。 然后 scDblFinder训练梯度提升树( gradient boosted trees ),以根据这些特征区分来自真实细胞的人工双细胞。 最后,阈值程序通过同时最小化错误分类率和预期的双细胞率来决定调用细胞的分数(参见Thresholding)。
step4: 基于分类器方法的一个关键问题是一些真实细胞被错误标记,从某种意义上说,它们实际上是双细胞,但被标记为单细胞。这些会误导分类器。出于这个原因,分类和阈值处理以迭代方式执行:在每一轮中,从下一轮的训练集中删除识别为双细胞的真实细胞。
双细胞只能出现在给定的样本或某次捕获中,因此需要为每个样本单独进行双细胞判别,这也加快了分析速度。如果给定samples参数,scDblFinder将利用该参数将细胞拆分为单个样本/捕获,并在给出BPPARAM参数的情况下并行分析。分类器将在全局范围内进行训练,但阈值将在每个样本的基础上进行优化。如果你的样品是多标签,即不同的样品混合在不同的批次中,那么需要提供批次信息。
通过将数据集减少到仅高表达的基因(由nfeatures参数控制),可以大大加快分析速度,即使会稍微影响到准确度。然后,根据cluster参数,将执行最终的PCA和聚类(使用内部 fastcluster函数)。基于cluster方法的基本原理是同型双细胞几乎不可能根据它们的转录组进行区分,因此创建这种双细胞是一种计算资源的浪费,而且还会误导分类器标记为单细胞。
然而,另一种方法是随机生成双细胞(将clusters设置为 FALSE 或 NULL),并使用迭代方法从训练中排除无法识别的人工双细胞。
根据cluster和propRandom参数,将通过合并随机细胞和/或不相同的cluster对的细胞合并,形成人工双细胞(这可以使用getArtificialDoublets函数手动执行)。 一部分双细胞将简单地使用组成细胞的counts总和,而其余的将进行调整文库大小和进行泊松重采样,数据校正。
对真实细胞和人工细胞的组合执行新的PCA,从中生成 kNN网络。 使用这个 kNN,为每个细胞收集了许多参数,例如 KNN中双细胞的比例、到最近双细胞和最近非双细胞的距离之比等。在输出中报告了一些带有“scDblFinder.”前缀的功能,例如:
scDblFinder有相当多的参数来控制预处理、双细胞的生成、分类等(参见?scDblFinder)。 我们仅对重要参数进行说明。
双细胞的期望检出率对邻域中人造双细胞的密度没有影响,但会影响分类器的分数,特别是分类临界值。是通过dbr和dbr.sd参数指定(dbr.sd指定dbr周围的 +/- 范围,在该范围内与dbr的偏差将被视为空)。
对于10x数据,捕获的细胞越多,产生双细胞的概率越大,Chromium文档表明每1000个细胞捕获的双细胞率大约为 1%(因此对于 5000 个细胞,(0.01 5) 5000 = 250 个双细胞) ,scDblFinder默认的预期双细胞率将设置为0.1(默认标准偏差为 0.015)。但是请注意,不同的实验方案可能会产生更多的双细胞率,因此需要相应地更新。如果不确定双细胞率,您可能会考虑增加 dbr.sd,以便大多数/纯粹从错误分类错误中估计它。
那么你很可能有错误的双细胞率。 如果你没有提供dbr参数,双细胞率将使用10X Genomics预期双细胞率自动计算,这意味着捕获的细胞越多,双细胞率就越高。 如果你认为不适用于你的数据,可手动设置dbr。
如果出现意外高的双细胞率最常见原因是,1)你有一个多样本数据集并且没有按样本进行拆分。 scDblFinder会认为数据是具有大量细胞的单次捕获,因此具有非常高的双细胞率。 按样本拆分应该可以解决问题。
阈值根据预期双细胞数量和错误分类(即人造双细胞)试图最小化方差,这意味着有效(即最终)双细胞率将与给定的不同。 scDblFinder还认为假阳性比假阴性对后续的分析问题要小些。你可以通过设置 [dbr.sd=0]在一定程度上减少与输入双细胞率的偏差。
虽然这两种方法在基准测试中的表现非常相似,但随机生成方法在复杂数据集中通常略胜一筹。 如果你的数据被非常清晰地划分为cluster,或者你对双细胞的起源感兴趣,则基于cluster的方法更可取。 这也将能够更准确地计算同型双细胞率,因此略好于阈值法(thresholding)。 否则,特别是如果你的数据没有非常清楚地划分为cluster,则随机方法(例如clusters= FALSE)更可取。
如果你在多样本数据集上运行scDblFinder但是未提供cluster标签,而是基于特定样本的标签(意味着一个样本中的标签“1”可能与另一个样本中的标签“1”无关),并且 在 tSNE上绘制它们看起来没有意义。 出于这个原因,当运行多个样本时,建议首先将所有样本聚集在一起(例如使用 sce$cluster - fastcluster(sce)),然后将cluster信息提供给 scDblFinder。
如果某些细胞的读数为零(或非常接近于零),则会出现‘Size factors should be positive’此错误。 过滤掉这些细胞后,错误应该消失了。 但是请注意,我们建议在运行 scDblFinder之前不要进行过于严格的过滤。
由于它依赖于人工双细胞的部分随机生成,因此对同一数据多次运行scDblFinder会产生略有不同的结果。你可以使用 set.seed() 确保可重复性,但是在多线程时使用 set.seed() 是不行的。请使用以下程序:
如果输入的sce对象已经包含归一化矩阵( logcounts)或名为“PCA”的reducedDim数据,scDblFinder将使用它们进行聚类分析。 此外,可以使用 scDblFinder() 函数的 cluster参数手动指定。 通过这种方式,seurat聚类可以例如用于创建人造双细胞(参见 ?Seurat::as.SingleCellExperiment.Seurat for conversion to SCE)。
在人造双细胞生成之后,真实和人造双细胞的计数必须一起重新处理(即归一化和 PCA),在内部使用scater执行的。 如果您希望以不同的方式执行此步骤,您可以提供自定义函数来执行此操作(请参阅 ?scDblFinder 中的处理参数)。 然而,我们注意到,这一步的变化对双细胞检测的影响不大。 事实上,例如,根本不执行任何归一化会降低双峰识别的准确性,但也是一点点。
可以,专门处理峰值数据。由于单细胞ATAC-seq数据的稀疏性比转录组要大得多,而且scDblFinder需要处理一系列基因,因此使用默认的标准参数,运行的性能较差(执行慢)。因此,我们推荐使用aggregateFeatures=TRUE,这将在正常的scDblFinder 过程之前聚合相似的基因(而不是选择基因),会产生不错的结果。如果基因足够少,我们推荐直接基于距离计算而不是通过SVD步骤获得,如下所示:
cDblFinder的输入数据不应包含空液滴,并且可能需要移除覆盖率非常低的细胞以避免错误(例如 200 reads)。
进一步的细胞质控应该在双细胞识别的下游进行,有两个理由:
1.默认的预期双细胞率是根据给定的细胞计算的,如果你排除了很多质量低的细胞,scDblFinder可能会认为双细胞率应该低于实际值。
2.剔除所有低质量细胞可能会妨碍我们检测由高质量细胞和低质量细胞组合形成的双细胞的能力。
话虽如此,这些主要是理论依据,除非你的QC过滤非常严格(而且不应该如此!),否则结果不太可能有很大的不同。
scDblFinder运行之前要做一些初次过滤,但不要太严格(例如 200 UMI)
质控粗略过滤-运行scDblFinder-较严格过滤
后记:
之前在群里看到有人问,双细胞质控后,拿质控后的数据重新跑scDblFinder,还是会有大量双细胞被检出?
这个是必然的,由scDblFinder的算法决定,它是由输入数据建立起预测分类模型,不像singleR有另外一套参考数据集,拿输入数据去map到参考数据集。
只要你喂给scDblFinder数据,它都会给输入的细胞一个score值,然后设置阈值进行分类,值高的为双细胞。
scDblFinder强烈依赖输入数据,有它使用的范畴,所以反复进行双细胞scDblFinder质控,用法是不对的。
拿到单细胞转录组数据,先质控粗略过滤-运行scDblFinder进行双细胞质控-较严格质控过滤。
其实最好有一个相互验证的过程。
PYTHON!!!!!!!!!!!!!!!!!!!!!!!
很简单啊。说下思路
首先打开文件,使用open函数,或者with语句,在网上搜“python 文件”就能搜到
然后把内容读到列表中,就用read函数,前面搜出来的网页中应该就有说明
count的话,直接有len函数就能得到,参数使用前面读到的列表
sum的话,直接sum函数,参数也是刚才的列表
average的话,sum除以count就完了
关于python的几个小编程 急!
注意:只能在脚本下运行,要直接在IDE下运行需要做修改。
#Q1
text=raw_input("Type in a line of text:")
punctuation = ['(', ')', '?', ':', ';', ',', '.', '!', '/', '"', "'"]
convers_to_list=list(text) #将字符串转换成list类型
new_list=[] #存放去掉标点符号后的字符
def move_pun():
for i in convers_to_list:
if i not in punctuation: #不在punctuation中的字符
new_list.append(i) #放进new_list中
new_string=''.join(new_list) #转换成string
print convers_to_list
print new_string
#运行
move_pun()
________________________________
#Q2
string=raw_input("Enter a string:")
length=len(string)
list_of_string=list(string) #将输入的字符串转换成列表,以便前后比较
def palindrome():
if length==1: #如果输入只有一个字符,也把它当做回文
print "Palindrome? True"
return
for i in range(length/2):
if list_of_string[i]!=list_of_string[length-1-i]: #前后对比,如果不相同就不是回文,退出
print "Palindrome? False"
return
else:
if i==length/2-1: #直到比较到字符串的中间位置前后都相同,可以判断是回文
print "Palindrome? True"
#运行
palindrome()
————————————————————————
#Q3
sentence=raw_input("Type in a sentence:")
punctuation = ['(', ')', '?', ':', ';', ',', '.', '!', '/', '"', "'",' ']
convers_to_list=list(sentence)
sentence_no_pun=[]
#将输入经过第一、第二个程序的处理即可
def sentence_palindrome():
for i in convers_to_list:
if i not in punctuation:
sentence_no_pun.append(i)
print_sentence=''.join(sentence_no_pun).lower() #把list转换成string再全部转换成小写
length=len(print_sentence)
print print_sentence
if length==1:
print "Palindrome? True"
return
for i in range(length/2):
if print_sentence[i]!=print_sentence[length-1-i]:
print "Palindrome? False"
return
else:
if i==length/2-1:
print "Palindrome? True"
#运行
sentence_palindrome()