You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

892 lines
31 KiB

#!/usr/bin/env python3
# Copyright 2014 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Sync files from/to an Android device."""
from __future__ import unicode_literals
import argparse
import locale
import logging
import os
import re
import stat
import subprocess
import time
from types import TracebackType
from typing import Callable, cast, Dict, List, IO, Iterable, Optional, Tuple, Type
class OSLike(object):
def listdir(self, path: bytes) -> Iterable[bytes]: # os's name, so pylint: disable=g-bad-name
raise NotImplementedError('Abstract')
def lstat(self, path: bytes) -> os.stat_result: # os's name, so pylint: disable=g-bad-name
raise NotImplementedError('Abstract')
def stat(self, path: bytes) -> os.stat_result: # os's name, so pylint: disable=g-bad-name
raise NotImplementedError('Abstract')
def unlink(self, path: bytes) -> None: # os's name, so pylint: disable=g-bad-name
raise NotImplementedError('Abstract')
def rmdir(self, path: bytes) -> None: # os's name, so pylint: disable=g-bad-name
raise NotImplementedError('Abstract')
def makedirs(self, path: bytes) -> None: # os's name, so pylint: disable=g-bad-name
raise NotImplementedError('Abstract')
def utime(self, path: bytes, times: Tuple[float, float]) -> None: # os's name, so pylint: disable=g-bad-name
raise NotImplementedError('Abstract')
class GlobLike(object):
def glob(self, path: bytes) -> Iterable[bytes]: # glob's name, so pylint: disable=g-bad-name
raise NotImplementedError('Abstract')
class Stdout(object):
def __init__(self, args: List[bytes]) -> None:
"""Closes the process's stdout when done.
Usage:
with Stdout(...) as stdout:
DoSomething(stdout)
Args:
args: Which program to run.
Returns:
An object for use by 'with'.
"""
self.popen = subprocess.Popen(args, stdout=subprocess.PIPE)
def __enter__(self) -> IO:
return self.popen.stdout
def __exit__(self, exc_type: Optional[Type[BaseException]],
exc_val: Optional[Exception],
exc_tb: Optional[TracebackType]) -> bool:
self.popen.stdout.close()
if self.popen.wait() != 0:
raise OSError('Subprocess exited with nonzero status.')
return False
class AdbFileSystem(GlobLike, OSLike):
"""Mimics os's file interface but uses the adb utility."""
def __init__(self, adb: List[bytes]) -> None:
self.stat_cache = {} # type: Dict[bytes, os.stat_result]
self.adb = adb
# Regarding parsing stat results, we only care for the following fields:
# - st_size
# - st_mtime
# - st_mode (but only about S_ISDIR and S_ISREG properties)
# Therefore, we only capture parts of 'ls -l' output that we actually use.
# The other fields will be filled with dummy values.
LS_TO_STAT_RE = re.compile(
br"""^
(?:
(?P<S_IFREG> -) |
(?P<S_IFBLK> b) |
(?P<S_IFCHR> c) |
(?P<S_IFDIR> d) |
(?P<S_IFLNK> l) |
(?P<S_IFIFO> p) |
(?P<S_IFSOCK> s))
[-r][-w][-xsS]
[-r][-w][-xsS]
[-r][-w][-xtT] # Mode string.
[ ]+
(?:
[0-9]+ # number of hard links
[ ]+
)?
[^ ]+ # User name/ID.
[ ]+
[^ ]+ # Group name/ID.
[ ]+
(?(S_IFBLK) [^ ]+[ ]+[^ ]+[ ]+) # Device numbers.
(?(S_IFCHR) [^ ]+[ ]+[^ ]+[ ]+) # Device numbers.
(?(S_IFDIR) [0-9]+ [ ]+)? # directory Size.
(?(S_IFREG)
(?P<st_size> [0-9]+) # Size.
[ ]+)
(?P<st_mtime>
[0-9]{4}-[0-9]{2}-[0-9]{2} # Date.
[ ]
[0-9]{2}:[0-9]{2}) # Time.
[ ]
# Don't capture filename for symlinks (ambiguous).
(?(S_IFLNK) .* | (?P<filename> .*))
$""", re.DOTALL | re.VERBOSE)
def LsToStat(self, line: bytes) -> Tuple[os.stat_result, bytes]:
"""Convert a line from 'ls -l' output to a stat result.
Args:
line: Output line of 'ls -l' on Android.
Returns:
os.stat_result for the line.
Raises:
OSError: if the given string is not a 'ls -l' output line (but maybe an
error message instead).
"""
match = self.LS_TO_STAT_RE.match(line)
if match is None:
logging.error('Could not parse %r.', line)
raise OSError('Unparseable ls -al result.')
groups = match.groupdict()
# Get the values we're interested in.
st_mode = ( # 0755
stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH
| stat.S_IXOTH)
if groups['S_IFREG']:
st_mode |= stat.S_IFREG
if groups['S_IFBLK']:
st_mode |= stat.S_IFBLK
if groups['S_IFCHR']:
st_mode |= stat.S_IFCHR
if groups['S_IFDIR']:
st_mode |= stat.S_IFDIR
if groups['S_IFIFO']:
st_mode |= stat.S_IFIFO
if groups['S_IFLNK']:
st_mode |= stat.S_IFLNK
if groups['S_IFSOCK']:
st_mode |= stat.S_IFSOCK
st_size = None if groups['st_size'] is None else int(groups['st_size'])
st_mtime = int(
time.mktime(
time.strptime(
match.group('st_mtime').decode('ascii'), '%Y-%m-%d %H:%M')))
# Fill the rest with dummy values.
st_ino = 1
st_rdev = 0
st_nlink = 1
st_uid = -2 # Nobody.
st_gid = -2 # Nobody.
st_atime = st_ctime = st_mtime
stbuf = os.stat_result((st_mode, st_ino, st_rdev, st_nlink, st_uid, st_gid,
st_size, st_atime, st_mtime, st_ctime))
filename = groups['filename']
return stbuf, filename
def QuoteArgument(self, arg: bytes) -> bytes:
# Quotes an argument for use by adb shell.
# Usually, arguments in 'adb shell' use are put in double quotes by adb,
# but not in any way escaped.
arg = arg.replace(b'\\', b'\\\\')
arg = arg.replace(b'"', b'\\"')
arg = arg.replace(b'$', b'\\$')
arg = arg.replace(b'`', b'\\`')
arg = b'"' + arg + b'"'
return arg
def IsWorking(self) -> bool:
"""Tests the adb connection."""
# This string should contain all possible evil, but no percent signs.
# Note this code uses 'date' and not 'echo', as date just calls strftime
# while echo does its own backslash escape handling additionally to the
# shell's. Too bad printf "%s\n" is not available.
test_strings = [
b'(', b'(; #`ls`$PATH\'"(\\\\\\\\){};!\xc0\xaf\xff\xc2\xbf'
]
for test_string in test_strings:
good = False
with Stdout(self.adb +
[b'shell',
b'date +%s' % (self.QuoteArgument(test_string),)]) as stdout:
for line in stdout:
line = line.rstrip(b'\r\n')
if line == test_string:
good = True
if not good:
return False
return True
def listdir(self, path: bytes) -> Iterable[bytes]: # os's name, so pylint: disable=g-bad-name
"""List the contents of a directory, caching them for later lstat calls."""
with Stdout(self.adb +
[b'shell',
b'ls -al %s' % (self.QuoteArgument(path + b'/'),)]) as stdout:
for line in stdout:
if line.startswith(b'total '):
continue
line = line.rstrip(b'\r\n')
try:
statdata, filename = self.LsToStat(line)
except OSError:
continue
if filename is None:
logging.error('Could not parse %r.', line)
else:
self.stat_cache[path + b'/' + filename] = statdata
yield filename
def lstat(self, path: bytes) -> os.stat_result: # os's name, so pylint: disable=g-bad-name
"""Stat a file."""
if path in self.stat_cache:
return self.stat_cache[path]
with Stdout(
self.adb +
[b'shell', b'ls -ald %s' % (self.QuoteArgument(path),)]) as stdout:
for line in stdout:
if line.startswith(b'total '):
continue
line = line.rstrip(b'\r\n')
statdata, _ = self.LsToStat(line)
self.stat_cache[path] = statdata
return statdata
raise OSError('No such file or directory')
def stat(self, path: bytes) -> os.stat_result: # os's name, so pylint: disable=g-bad-name
"""Stat a file."""
if path in self.stat_cache and not stat.S_ISLNK(
self.stat_cache[path].st_mode):
return self.stat_cache[path]
with Stdout(
self.adb +
[b'shell', b'ls -aldL %s' % (self.QuoteArgument(path),)]) as stdout:
for line in stdout:
if line.startswith(b'total '):
continue
line = line.rstrip(b'\r\n')
statdata, _ = self.LsToStat(line)
self.stat_cache[path] = statdata
return statdata
raise OSError('No such file or directory')
def unlink(self, path: bytes) -> None: # os's name, so pylint: disable=g-bad-name
"""Delete a file."""
if subprocess.call(
self.adb + [b'shell', b'rm %s' % (self.QuoteArgument(path),)]) != 0:
raise OSError('unlink failed')
def rmdir(self, path: bytes) -> None: # os's name, so pylint: disable=g-bad-name
"""Delete a directory."""
if subprocess.call(
self.adb +
[b'shell', b'rmdir %s' % (self.QuoteArgument(path),)]) != 0:
raise OSError('rmdir failed')
def makedirs(self, path: bytes) -> None: # os's name, so pylint: disable=g-bad-name
"""Create a directory."""
if subprocess.call(
self.adb +
[b'shell', b'mkdir -p %s' % (self.QuoteArgument(path),)]) != 0:
raise OSError('mkdir failed')
def utime(self, path: bytes, times: Tuple[float, float]) -> None:
# TODO(rpolzer): Find out why this does not work (returns status 255).
"""Set the time of a file to a specified unix time."""
atime, mtime = times
timestr = time.strftime('%Y%m%d.%H%M%S',
time.localtime(mtime)).encode('ascii')
if subprocess.call(
self.adb +
[b'shell',
b'touch -mt %s %s' % (timestr, self.QuoteArgument(path))]) != 0:
raise OSError('touch failed')
timestr = time.strftime('%Y%m%d.%H%M%S',
time.localtime(atime)).encode('ascii')
if subprocess.call(
self.adb +
[b'shell',
b'touch -at %s %s' % (timestr, self.QuoteArgument(path))]) != 0:
raise OSError('touch failed')
def glob(self, path: bytes) -> Iterable[bytes]: # glob's name, so pylint: disable=g-bad-name
with Stdout(
self.adb +
[b'shell', b'for p in %s; do echo "$p"; done' % (path,)]) as stdout:
for line in stdout:
yield line.rstrip(b'\r\n')
def Push(self, src: bytes, dst: bytes) -> None:
"""Push a file from the local file system to the Android device."""
if subprocess.call(self.adb + [b'push', src, dst]) != 0:
raise OSError('push failed')
def Pull(self, src: bytes, dst: bytes) -> None:
"""Pull a file from the Android device to the local file system."""
if subprocess.call(self.adb + [b'pull', src, dst]) != 0:
raise OSError('pull failed')
def BuildFileList(fs: OSLike, path: bytes, follow_links: bool,
prefix: bytes) -> Iterable[Tuple[bytes, os.stat_result]]:
"""Builds a file list.
Args:
fs: File system provider (can be os or AdbFileSystem()).
path: Initial path.
follow_links: Whether to follow symlinks while iterating. May recurse
endlessly.
prefix: Path prefix for output file names.
Yields:
File names from path (prefixed by prefix).
Directories are yielded before their contents.
"""
try:
if follow_links:
statresult = fs.stat(path)
else:
statresult = fs.lstat(path)
except OSError:
return
if stat.S_ISDIR(statresult.st_mode):
yield prefix, statresult
try:
files = fs.listdir(path)
except OSError:
return
for n in files:
if n == b'.' or n == b'..':
continue
for t in BuildFileList(fs, path + b'/' + n, follow_links,
prefix + b'/' + n):
yield t
elif stat.S_ISREG(statresult.st_mode):
yield prefix, statresult
elif stat.S_ISLNK(statresult.st_mode) and not follow_links:
yield prefix, statresult
else:
logging.info('Unsupported file: %r.', path)
def DiffLists(a: Iterable[Tuple[bytes, os.stat_result]],
b: Iterable[Tuple[bytes, os.stat_result]]
) -> Tuple[List[Tuple[bytes, os.stat_result]], List[
Tuple[bytes, os.stat_result, os
.stat_result]], List[Tuple[bytes, os.stat_result]]]:
"""Compares two lists.
Args:
a: the first list.
b: the second list.
Returns:
a_only: the items from list a.
both: the items from both list, with the remaining tuple items combined.
b_only: the items from list b.
"""
a_only = [] # type: List[Tuple[bytes, os.stat_result]]
b_only = [] # type: List[Tuple[bytes, os.stat_result]]
both = [] # type: List[Tuple[bytes, os.stat_result, os.stat_result]]
a_revlist = sorted(a)
a_revlist.reverse()
b_revlist = sorted(b)
b_revlist.reverse()
while True:
if not a_revlist:
b_only.extend(reversed(b_revlist))
break
if not b_revlist:
a_only.extend(reversed(a_revlist))
break
a_item = a_revlist[len(a_revlist) - 1]
b_item = b_revlist[len(b_revlist) - 1]
if a_item[0] == b_item[0]:
both.append((a_item[0], a_item[1], b_item[1]))
a_revlist.pop()
b_revlist.pop()
elif a_item[0] < b_item[0]:
a_only.append(a_item)
a_revlist.pop()
elif a_item[0] > b_item[0]:
b_only.append(b_item)
b_revlist.pop()
else:
raise
return a_only, both, b_only
class DeleteInterruptedFile(object):
def __init__(self, dry_run: bool, fs: OSLike, name: bytes) -> None:
"""Sets up interrupt protection.
Usage:
with DeleteInterruptedFile(False, fs, name):
DoSomething()
If DoSomething() should get interrupted, the file 'name' will be deleted.
The exception otherwise will be passed on.
Args:
dry_run: If true, we don't actually delete.
fs: File system object.
name: File name to delete.
Returns:
An object for use by 'with'.
"""
self.dry_run = dry_run
self.fs = fs
self.name = name
def __enter__(self) -> None:
pass
def __exit__(self, exc_type: Optional[Type[BaseException]],
exc_val: Optional[Exception],
exc_tb: Optional[TracebackType]) -> bool:
if exc_type is not None:
logging.info('Interrupted-%s-Delete: %r',
'Pull' if self.fs == os else 'Push', self.name)
if not self.dry_run:
self.fs.unlink(self.name)
return False
class FileSyncer(object):
"""File synchronizer."""
def __init__(self, adb: AdbFileSystem, local_path: bytes, remote_path: bytes,
local_to_remote: bool, remote_to_local: bool,
preserve_times: bool, delete_missing: bool,
allow_overwrite: bool, allow_replace: bool, copy_links: bool,
dry_run: bool) -> None:
self.local = local_path
self.remote = remote_path
self.adb = adb
self.local_to_remote = local_to_remote
self.remote_to_local = remote_to_local
self.preserve_times = preserve_times
self.delete_missing = delete_missing
self.allow_overwrite = allow_overwrite
self.allow_replace = allow_replace
self.copy_links = copy_links
self.dry_run = dry_run
self.num_bytes = 0
self.start_time = time.time()
# Attributes filled in later.
local_only = None # type: List[Tuple[bytes, os.stat_result]]
both = None # type: List[Tuple[bytes, os.stat_result, os.stat_result]]
remote_only = None # type: List[Tuple[bytes, os.stat_result]]
src_to_dst = None # type: Tuple[bool, bool]
dst_to_src = None # type: Tuple[bool, bool]
src_only = None # type: Tuple[List[Tuple[bytes, os.stat_result]], List[Tuple[bytes, os.stat_result]]]
dst_only = None # type: Tuple[List[Tuple[bytes, os.stat_result]], List[Tuple[bytes, os.stat_result]]]
src = None # type: Tuple[bytes, bytes]
dst = None # type: Tuple[bytes, bytes]
dst_fs = None # type: Tuple[OSLike, OSLike]
push = None # type: Tuple[str, str]
copy = None # type: Tuple[Callable[[bytes, bytes], None], Callable[[bytes, bytes], None]]
def IsWorking(self) -> bool:
"""Tests the adb connection."""
return self.adb.IsWorking()
def ScanAndDiff(self) -> None:
"""Scans the local and remote locations and identifies differences."""
logging.info('Scanning and diffing...')
locallist = BuildFileList(
cast(OSLike, os), self.local, self.copy_links, b'')
remotelist = BuildFileList(self.adb, self.remote, self.copy_links, b'')
self.local_only, self.both, self.remote_only = DiffLists(
locallist, remotelist)
if not self.local_only and not self.both and not self.remote_only:
logging.warning('No files seen. User error?')
self.src_to_dst = (self.local_to_remote, self.remote_to_local)
self.dst_to_src = (self.remote_to_local, self.local_to_remote)
self.src_only = (self.local_only, self.remote_only)
self.dst_only = (self.remote_only, self.local_only)
self.src = (self.local, self.remote)
self.dst = (self.remote, self.local)
self.dst_fs = (self.adb, cast(OSLike, os))
self.push = ('Push', 'Pull')
self.copy = (self.adb.Push, self.adb.Pull)
def PerformDeletions(self) -> None:
"""Perform all deleting necessary for the file sync operation."""
if not self.delete_missing:
return
for i in [0, 1]:
if self.src_to_dst[i] and not self.dst_to_src[i]:
if not self.src_only[i] and not self.both:
logging.error('Cowardly refusing to delete everything.')
else:
for name, s in reversed(self.dst_only[i]):
dst_name = self.dst[i] + name
logging.info('%s-Delete: %r', self.push[i], dst_name)
if stat.S_ISDIR(s.st_mode):
if not self.dry_run:
self.dst_fs[i].rmdir(dst_name)
else:
if not self.dry_run:
self.dst_fs[i].unlink(dst_name)
del self.dst_only[i][:]
def PerformOverwrites(self) -> None:
"""Delete files/directories that are in the way for overwriting."""
src_only_prepend = (
[], []
) # type: Tuple[List[Tuple[bytes, os.stat_result]], List[Tuple[bytes, os.stat_result]]]
for name, localstat, remotestat in self.both:
if stat.S_ISDIR(localstat.st_mode) and stat.S_ISDIR(remotestat.st_mode):
# A dir is a dir is a dir.
continue
elif stat.S_ISDIR(localstat.st_mode) or stat.S_ISDIR(remotestat.st_mode):
# Dir vs file? Nothing to do here yet.
pass
else:
# File vs file? Compare sizes.
if localstat.st_size == remotestat.st_size:
continue
l2r = self.local_to_remote
r2l = self.remote_to_local
if l2r and r2l:
# Truncate times to full minutes, as Android's "ls" only outputs minute
# accuracy.
localminute = int(localstat.st_mtime / 60)
remoteminute = int(remotestat.st_mtime / 60)
if localminute > remoteminute:
r2l = False
elif localminute < remoteminute:
l2r = False
if l2r and r2l:
logging.warning('Unresolvable: %r', name)
continue
if l2r:
i = 0 # Local to remote operation.
src_stat = localstat
dst_stat = remotestat
else:
i = 1 # Remote to local operation.
src_stat = remotestat
dst_stat = localstat
dst_name = self.dst[i] + name
logging.info('%s-Delete-Conflicting: %r', self.push[i], dst_name)
if stat.S_ISDIR(localstat.st_mode) or stat.S_ISDIR(remotestat.st_mode):
if not self.allow_replace:
logging.info('Would have to replace to do this. '
'Use --force to allow this.')
continue
if not self.allow_overwrite:
logging.info('Would have to overwrite to do this, '
'which --no-clobber forbids.')
continue
if stat.S_ISDIR(dst_stat.st_mode):
kill_files = [
x for x in self.dst_only[i] if x[0][:len(name) + 1] == name + b'/'
]
self.dst_only[i][:] = [
x for x in self.dst_only[i] if x[0][:len(name) + 1] != name + b'/'
]
for l, s in reversed(kill_files):
if stat.S_ISDIR(s.st_mode):
if not self.dry_run:
self.dst_fs[i].rmdir(self.dst[i] + l)
else:
if not self.dry_run:
self.dst_fs[i].unlink(self.dst[i] + l)
if not self.dry_run:
self.dst_fs[i].rmdir(dst_name)
elif stat.S_ISDIR(src_stat.st_mode):
if not self.dry_run:
self.dst_fs[i].unlink(dst_name)
else:
if not self.dry_run:
self.dst_fs[i].unlink(dst_name)
src_only_prepend[i].append((name, src_stat))
for i in [0, 1]:
self.src_only[i][:0] = src_only_prepend[i]
def PerformCopies(self) -> None:
"""Perform all copying necessary for the file sync operation."""
for i in [0, 1]:
if self.src_to_dst[i]:
for name, s in self.src_only[i]:
if b':' in name:
logging.info('Found : in name. Replacing with _ to play nice with android. Original name is: %s',name)
newname = name[:name.find(b':')] + b'_' + name[name.find(b':')+1:]
while b':' in newname:
logging.info('Found : in name. Replacing with _ to play nice with android. Original name is: %s',name)
newname = newname[:newname.find(b':')] + b'_' + newname[newname.find(b':')+1:]
src_name = self.src[i] + name #if name has ':', replace with '_'
if b':' in name:
dst_name = self.dst[i] + newname
else:
dst_name = self.dst[i] + name
logging.info('%s: %r', self.push[i], dst_name)
if stat.S_ISDIR(s.st_mode):
if not self.dry_run:
self.dst_fs[i].makedirs(dst_name)
else:
with DeleteInterruptedFile(self.dry_run, self.dst_fs[i], dst_name):
if not self.dry_run:
self.copy[i](src_name, dst_name)
if stat.S_ISREG(s.st_mode):
self.num_bytes += s.st_size
if not self.dry_run:
if self.preserve_times:
logging.info('%s-Times: accessed %s, modified %s', self.push[i],
time.asctime(time.localtime(s.st_atime)),
time.asctime(time.localtime(s.st_mtime)))
self.dst_fs[i].utime(dst_name, (s.st_atime, s.st_mtime))
def TimeReport(self) -> None:
"""Report time and amount of data transferred."""
if self.dry_run:
logging.info('Total: %d bytes', self.num_bytes)
else:
end_time = time.time()
dt = end_time - self.start_time
rate = self.num_bytes / 1024.0 / dt
logging.info('Total: %d KB/s (%d bytes in %.3fs)', rate, self.num_bytes,
dt)
def ExpandWildcards(globber: GlobLike, path: bytes) -> Iterable[bytes]:
if path.find(b'?') == -1 and path.find(b'*') == -1 and path.find(b'[') == -1:
return [path]
return globber.glob(path)
def FixPath(src: bytes, dst: bytes) -> Tuple[bytes, bytes]:
# rsync-like path munging to make remote specifications shorter.
append = b''
pos = src.rfind(b'/')
if pos >= 0:
if src.endswith(b'/'):
# Final slash: copy to the destination "as is".
pass
else:
# No final slash: destination name == source name.
append = src[pos:]
else:
# No slash at all - use same name at destination.
append = b'/' + src
# Append the destination file name if any.
# BUT: do not append "." or ".." components!
if append != b'/.' and append != b'/..':
dst += append
return (src, dst)
def main() -> None:
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser(
description='Synchronize a directory between an Android device and the '
'local file system')
parser.add_argument(
'source',
metavar='SRC',
type=str,
nargs='+',
help='The directory to read files/directories from. '
'This must be a local path if -R is not specified, '
'and an Android path if -R is specified. If SRC does '
'not end with a final slash, its last path component '
'is appended to DST (like rsync does).')
parser.add_argument(
'destination',
metavar='DST',
type=str,
help='The directory to write files/directories to. '
'This must be an Android path if -R is not specified, '
'and a local path if -R is specified.')
parser.add_argument(
'-e',
'--adb',
metavar='COMMAND',
default='adb',
type=str,
help='Use the given adb binary and arguments.')
parser.add_argument(
'--device',
action='store_true',
help='Directs command to the only connected USB device; '
'returns an error if more than one USB device is present. '
'Corresponds to the "-d" option of adb.')
parser.add_argument(
'--emulator',
action='store_true',
help='Directs command to the only running emulator; '
'returns an error if more than one emulator is running. '
'Corresponds to the "-e" option of adb.')
parser.add_argument(
'-s',
'--serial',
metavar='DEVICE',
type=str,
help='Directs command to the device or emulator with '
'the given serial number or qualifier. Overrides '
'ANDROID_SERIAL environment variable. Use "adb devices" '
'to list all connected devices with their respective serial number. '
'Corresponds to the "-s" option of adb.')
parser.add_argument(
'-H',
'--host',
metavar='HOST',
type=str,
help='Name of adb server host (default: localhost). '
'Corresponds to the "-H" option of adb.')
parser.add_argument(
'-P',
'--port',
metavar='PORT',
type=str,
help='Port of adb server (default: 5037). '
'Corresponds to the "-P" option of adb.')
parser.add_argument(
'-R',
'--reverse',
action='store_true',
help='Reverse sync (pull, not push).')
parser.add_argument(
'-2',
'--two-way',
action='store_true',
help='Two-way sync (compare modification time; after '
'the sync, both sides will have all files in the '
'respective newest version. This relies on the clocks '
'of your system and the device to match.')
parser.add_argument(
'-t',
'--times',
action='store_true',
help='Preserve modification times when copying.')
parser.add_argument(
'-d',
'--delete',
action='store_true',
help='Delete files from DST that are not present on '
'SRC. Mutually exclusive with -2.')
parser.add_argument(
'-f',
'--force',
action='store_true',
help='Allow deleting files/directories when having to '
'replace a file by a directory or vice versa. This is '
'disabled by default to prevent large scale accidents.')
parser.add_argument(
'-n',
'--no-clobber',
action='store_true',
help='Do not ever overwrite any '
'existing files. Mutually exclusive with -f.')
parser.add_argument(
'-L',
'--copy-links',
action='store_true',
help='transform symlink into referent file/dir')
parser.add_argument(
'--dry-run',
action='store_true',
help='Do not do anything - just show what would be done.')
args = parser.parse_args()
localpatterns = [os.fsencode(x) for x in args.source]
remotepath = os.fsencode(args.destination)
adb_args = os.fsencode(args.adb).split(b' ')
if args.device:
adb_args += [b'-d']
if args.emulator:
adb_args += [b'-e']
if args.serial:
adb_args += [b'-s', os.fsencode(args.serial)]
if args.host:
adb_args += [b'-H', os.fsencode(args.host)]
if args.port:
adb_args += [b'-P', os.fsencode(args.port)]
adb = AdbFileSystem(adb_args)
# Expand wildcards, but only on the remote side.
localpaths = []
remotepaths = []
if args.reverse:
for pattern in localpatterns:
for src in ExpandWildcards(adb, pattern):
src, dst = FixPath(src, remotepath)
localpaths.append(src)
remotepaths.append(dst)
else:
for src in localpatterns:
src, dst = FixPath(src, remotepath)
localpaths.append(src)
remotepaths.append(dst)
preserve_times = args.times
delete_missing = args.delete
allow_replace = args.force
allow_overwrite = not args.no_clobber
copy_links = args.copy_links
dry_run = args.dry_run
local_to_remote = True
remote_to_local = False
if args.two_way:
local_to_remote = True
remote_to_local = True
if args.reverse:
local_to_remote, remote_to_local = remote_to_local, local_to_remote
localpaths, remotepaths = remotepaths, localpaths
if allow_replace and not allow_overwrite:
logging.error('--no-clobber and --force are mutually exclusive.')
parser.print_help()
return
if delete_missing and local_to_remote and remote_to_local:
logging.error('--delete and --two-way are mutually exclusive.')
parser.print_help()
return
# Two-way sync is only allowed with disjoint remote and local path sets.
if (remote_to_local and local_to_remote) or delete_missing:
if ((remote_to_local and len(localpaths) != len(set(localpaths))) or
(local_to_remote and len(remotepaths) != len(set(remotepaths)))):
logging.error(
'--two-way and --delete are only supported for disjoint sets of '
'source and destination paths (in other words, all SRC must '
'differ in basename).')
parser.print_help()
return
for i in range(len(localpaths)):
logging.info('Sync: local %r, remote %r', localpaths[i], remotepaths[i])
syncer = FileSyncer(adb, localpaths[i], remotepaths[i], local_to_remote,
remote_to_local, preserve_times, delete_missing,
allow_overwrite, allow_replace, copy_links, dry_run)
if not syncer.IsWorking():
logging.error('Device not connected or not working.')
return
try:
syncer.ScanAndDiff()
syncer.PerformDeletions()
syncer.PerformOverwrites()
syncer.PerformCopies()
finally:
syncer.TimeReport()
if __name__ == '__main__':
main()