#!/usr/bin/env python import logging, optparse, gzip from sys import stdin, stdout, stderr, exit, modules from os.path import basename, join try: from urllib.request import urlopen # py2 except ImportError: from urllib2 import urlopen # py3 try: from cStringIO import StringIO # py2 except ImportError: from io import BytesIO # py3 # ==== functions ===== def parseArgs(): " setup logging, parse command line arguments and options. -h shows auto-generated help page " parser = optparse.OptionParser("""usage: %prog [options] filename - change NCBI or Ensembl chromosome names to UCSC names in tabular or wiggle files, using a chromAlias table. Supports these UCSC file formats: BED, genePred, PSL, wiggle (all formats), bedGraph, VCF, SAM, GTF, Chain ... or any other csv or tsv format where the sequence (chromosome) name is a separate field. Requires a .chromAlias.tsv file which can be downloaded like this: %prog --get hg19 # download the file hg19.chromAlias.tsv into current directory Which also works for GenArk assemblies and can take an output directory: %prog --get GCF_000001735.3 -o /tmp/ # for GenArk assemblies, will translate to NCBI sequence names (accessions) If you do not want to use the --get option to retrieve the mapping tables, you can also download the alias mapping files yourself, e.g. for mm10 with 'wget https://hgdownload.soe.ucsc.edu/goldenPath/mm10/database/chromAlias.txt.gz' Then the script can be run like this: %prog -i in.bed -o out.bed -a hg19.chromAlias.tsv %prog -i in.bed -o out.bed -a https://hgdownload.soe.ucsc.edu/goldenPath/hg19/database/chromAlias.txt.gz Or in pipes, like this: cat test.bed | %prog -a mm10.chromAlias.tsv > test.ucsc.bed For BAM files use this program in a pipe with samtools: samtools view -h in.bam | ./chromToUcsc -a mm10.chromAlias.tsv | samtools -bS > out.bam By default, this script expects the chromosome name in the first field. The default works for BED, bedGraph, GTF, wiggle, VCF. For the following file formats, you will need to set the -k option to these values manually: genePred: 2 -- PSL: 10 (query) or 14 (target) -- chain: 2 (target) or 7 (query) -- SAM: 2 (If a line starts with @ (SAM format), -k is automatically set to 2.) """) parser.add_option("", "--get", dest="downloadDb", action="store", help="download a chrom alias table from UCSC for the genomeDb into the current directory or directory provided by -o and exit") parser.add_option("-a", "--chromAlias", dest="aliasFname", action="store", help="a UCSC chromAlias file in tab-sep format or the http/https URL to one") parser.add_option("-i", "--in", dest="inFname", action="store", help="input filename, default: /dev/stdin") parser.add_option("-o", "--out", dest="outFname", action="store", help="output filename, default: /dev/stdout") parser.add_option("-d", "--debug", dest="debug", action="store_true", help="show debug messages") parser.add_option("-s", "--skipUnknown", dest="skipUnknown", action="store_true", help="skip unknown sequence rather than generate an error.") parser.add_option("-k", "--field", dest="fieldNo", action="store", type="int", \ help="Index of field (1-based) that contains the chromosome name. No other field is touched by this program, unless the " "SAM format is detected. Default is %default (first field).", default=1) (options, args) = parser.parse_args() if options.downloadDb is None and options.aliasFname is None: parser.print_help() exit(1) if options.debug: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.INFO) return args, options # ----------- main -------------- def splitLines(ifh): " yield (chromName, restOfLine) for all lines of ifh " sep = -1 lineNo = 0 for line in ifh: lineNo += 1 if line.startswith("#"): yield lineNo, None, None, line.rstrip("\n\r") else: if line.startswith("fixedStep") or line.startswith("variableStep"): row = line.split() yield lineNo, " ", row, line # *step lines are always space-separated else: if sep==-1: # why test first for space? Because after fixedStep, the lines are just single numbers if "\t" in line: sep = "\t" elif " " in line: sep = None # = split on any whitespace, consec. whitespc counts as one else: sep = "\t" # default is to split on tab row = line.rstrip("\n\r").split(sep) if sep is None: sep = " " yield lineNo, sep, row, line def parseNewAlias(ifh): " IGV-compatible format: first is UCSC, all other columns are aliases " toUcsc = {} for line in ifh: if line.startswith("#"): continue row = line.rstrip("\n").split("\t") for i in range(1, len(row)): toUcsc[row[i]] = row[0] return toUcsc def parseAlias(fname): " parse tsv file with at least two columns, orig chrom name and new chrom name " logging.debug("alias file is in IGV-format") toUcsc = {} if fname.startswith("http://") or fname.startswith("https://"): data = downloadUrl(fname) ifh = data.splitlines() elif fname.endswith(".gz"): ifh = gzip.open(fname, "rt") else: ifh = open(fname) firstLine = True for line in ifh: if line.startswith("#") and firstLine: return parseNewAlias(ifh) if line.startswith("alias"): continue row = line.rstrip("\n").split("\t") toUcsc[row[0]] = row[1] firstLine = False return toUcsc def handledUnmappedChrom(chrom, skipUnknown, skipWarned, message): "either generate an error or warning when an unknown chromosome is encountered." if skipUnknown: if chrom not in skipWarned: logging.warning(message) skipWarned.add(chrom) else: logging.error(message) exit(1) def chromToUcsc(aliasFname, fieldIdx, skipUnknown, ifh, ofh): " convert column number fieldIdx to UCSC-style chrom names " toUcsc = parseAlias(aliasFname) skipWarned = set() ucscChroms = set(toUcsc.values()) isSam = False for lineNo, sep, row, line in splitLines(ifh): if line.startswith("#"): ofh.write(line) # comments (e.g. gtf) are just passed through ofh.write("\n") continue elif len(row)<=2: pass # fixedStep or variableStep value lines are passed through elif row[0]=="fixedStep" or row[0]=="variableStep": # fixedStep chrom=NC_033660.1 start=15778865 step=1 key, chrom = row[1].split("=") assert(key=="chrom") # we assume that the first field has the chrom, that saves a lot of time ucscChrom = toUcsc.get(chrom) if ucscChrom is None: logging.error("line %d: chrom name %s is not in chromAlias table" % (lineNo, repr(chrom))) exit(1) row[1] = "chrom="+ucscChrom elif row[0][0]== "@": fieldIdx = 2 # SAM format detected isSam = True if row[0]=="@SQ": # @SQ SN:1 LN:195471971 chrom = row[1].split(":")[1] if chrom not in ucscChroms: ucscChrom = toUcsc.get(chrom) if ucscChrom is None: handledUnmappedChrom(chrom, skipUnknown, skipWarned, "SAM format, header line %d: chrom name %s is not in chromAlias table" % (lineNo, repr(chrom))) continue row[1] = "SN:%s" % ucscChrom else: # just pass through any UCSC chrom names chrom = row[fieldIdx] if row[0].startswith("#") or chrom in ucscChroms or chrom=="*": ucscChrom = chrom else: ucscChrom = toUcsc.get(chrom) if ucscChrom is None: handledUnmappedChrom(chrom, skipUnknown, skipWarned, "line %d: chrom name %s is not in chromAlias table" % (lineNo, repr(chrom))) continue if isSam: mateChrom = row[6] if mateChrom not in ("=", "*"): row[6] = toUcsc[mateChrom] row[fieldIdx] = ucscChrom line = sep.join(row) ofh.write(line) ofh.write("\n") def downloadUrl(url): """ download URL and return as string. gzip OK. Supporting both py2 and py3 here makes this method more complicated that one would expect. """ data = urlopen(url).read() if url.endswith(".gz"): if "decompress" in dir(gzip): # py3 data = gzip.decompress(data) else: data = gzip.GzipFile(fileobj=StringIO(data)).read() # py2 if isinstance(data, bytes): # urlopen returns 'bytes' on py3 data = data.decode("latin1") return data def download(db, outDir): " download chromAlias file from UCSC " # Genark assemblies are in a different directory of the download server if "_" in db: p1 = db[0:3] p2 = db[4:7] p3 = db[7:10] p4 = db[10:13] url = "https://hgdownload.soe.ucsc.edu/hubs/%s/%s/%s/%s/%s/%s.chromAlias.txt" % (p1, p2, p3, p4, db, db) elif db in ["hg38"]: # hg38 has been patched a few times, assume that the user wants the latest chromAlias file url="https://hgdownload.soe.ucsc.edu/goldenPath/%s/bigZips/latest/%s.chromAlias.txt" % (db, db) else: url = "https://hgdownload.soe.ucsc.edu/goldenPath/%s/database/chromAlias.txt.gz" % db data = downloadUrl(url) if outDir is None: outDir = "." outFname = join(outDir, db+".chromAlias.tsv") open(outFname, "w").write(data) print("Wrote %s to %s" % (url, outFname)) print("You can now convert a file with 'chromToUcsc -a %s -i infile.bed -o outfile.bed'" % outFname) exit(0) def main(): args, options = parseArgs() aliasFname = options.aliasFname inFname = options.inFname outFname = options.outFname skipUnknown = options.skipUnknown if options.downloadDb: download(options.downloadDb, outFname) if aliasFname is None: logging.error("You need to provide an alias table with the -a option or use --get to download one.") exit(1) if inFname is None: ifh = stdin elif inFname.endswith(".gz"): ifh = gzip.open(inFname, "rt") else: ifh = open(inFname) if outFname is None: ofh = stdout elif outFname.endswith(".gz"): ofh = gzip.open(outFname, "wt") else: ofh = open(outFname, "w") fieldIdx = options.fieldNo-1 chromToUcsc(aliasFname, fieldIdx, skipUnknown, ifh, ofh) main()