mirror of
https://github.com/osm2pgsql-dev/osm2pgsql.git
synced 2025-08-20 13:20:40 +00:00
513 lines
20 KiB
Python
Executable File
513 lines
20 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# SPDX-License-Identifier: GPL-2.0-or-later
|
|
#
|
|
# This file is part of osm2pgsql (https://osm2pgsql.org/).
|
|
#
|
|
# Copyright (C) 2006-2021 by the osm2pgsql developer community.
|
|
# For a full list of authors see the git log.
|
|
|
|
"""
|
|
Update an osm2pgsql database with changes from a OSM replication server.
|
|
|
|
This tool initialises the updating process by looking at the import file
|
|
or the newest object in the database. The state is then saved in a table
|
|
in the database. Subsequent runs download newly available data and apply
|
|
it to the database.
|
|
|
|
See the help of the 'init' and 'update' command for more information on
|
|
how to use %(prog)s.
|
|
"""
|
|
|
|
from argparse import ArgumentParser, RawDescriptionHelpFormatter
|
|
import datetime as dt
|
|
import json
|
|
import logging
|
|
import sys
|
|
import subprocess
|
|
import tempfile
|
|
from textwrap import dedent
|
|
import traceback
|
|
from pathlib import Path
|
|
import urllib.request as urlrequest
|
|
|
|
import psycopg2
|
|
|
|
from osmium.replication.server import ReplicationServer
|
|
from osmium.replication.utils import get_replication_header
|
|
from osmium import WriteHandler
|
|
|
|
LOG = logging.getLogger()
|
|
|
|
def pretty_format_timedelta(seconds):
|
|
minutes = int(seconds/60)
|
|
(hours, minutes) = divmod(minutes, 60)
|
|
(days, hours) = divmod(hours, 24)
|
|
(weeks, days) = divmod(days, 7)
|
|
|
|
if 0 < seconds < 60:
|
|
output = "<1 minute"
|
|
else:
|
|
output = []
|
|
# If weeks > 1 but hours == 0, we still want to show "0 hours"
|
|
if weeks > 0:
|
|
output.append("{} week(s)".format(weeks))
|
|
if days > 0 or weeks > 0:
|
|
output.append("{} day(s)".format(days))
|
|
if hours > 0 or days > 0 or weeks > 0:
|
|
output.append("{} hour(s)".format(hours))
|
|
|
|
output.append("{} minute(s)".format(minutes))
|
|
output = " ".join(output)
|
|
return output
|
|
|
|
def connect(args):
|
|
""" Create a connection from the given command line arguments.
|
|
"""
|
|
return psycopg2.connect(dbname=args.database, user=args.username,
|
|
host=args.host, port=args.port)
|
|
|
|
|
|
def compute_database_date(conn, prefix):
|
|
""" Determine the date of the database from the newest object in the
|
|
database.
|
|
"""
|
|
# First, find the way with the highest ID in the database
|
|
# Using nodes would be more reliable but those are not cached by osm2pgsql.
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT max(id) FROM {}_ways".format(prefix))
|
|
osmid = cur.fetchone()[0] if cur.rowcount == 1 else None
|
|
|
|
if osmid is None:
|
|
LOG.fatal("No data found in the database.")
|
|
return None
|
|
|
|
LOG.debug("Using way id %d for timestamp lookup", osmid)
|
|
# Get the way from the API to find the timestamp when it was created.
|
|
url = 'https://www.openstreetmap.org/api/0.6/way/{}/1'.format(osmid)
|
|
headers = {"User-Agent" : "osm2pgsql-update",
|
|
"Accept" : "application/json"}
|
|
with urlrequest.urlopen(urlrequest.Request(url, headers=headers)) as response:
|
|
data = json.loads(response.read().decode('utf-8'))
|
|
|
|
if not data.get('elements') or not 'timestamp' in data['elements'][0]:
|
|
LOG.fatal("The way data downloaded from the API does not contain valid data.\n"
|
|
"URL used: %s", url)
|
|
return None
|
|
|
|
date = data['elements'][0]['timestamp']
|
|
LOG.debug("Found timestamp %s", date)
|
|
|
|
try:
|
|
date = dt.datetime.strptime(date, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=dt.timezone.utc)
|
|
except ValueError:
|
|
LOG.fatal("Cannot parse timestamp '%s'", date)
|
|
return None
|
|
|
|
return date.replace(tzinfo=dt.timezone.utc)
|
|
|
|
|
|
def setup_replication_state(conn, table, base_url, seq, date):
|
|
""" (Re)create the table for the replication state and fill it with
|
|
the given state.
|
|
"""
|
|
with conn.cursor() as cur:
|
|
cur.execute('DROP TABLE IF EXISTS "{}"'.format(table))
|
|
cur.execute("""CREATE TABLE "{}"
|
|
(url TEXT,
|
|
sequence INTEGER,
|
|
importdate TIMESTAMP WITH TIME ZONE)
|
|
""".format(table))
|
|
cur.execute('INSERT INTO "{}" VALUES(%s, %s, %s)'.format(table),
|
|
(base_url, seq, date))
|
|
conn.commit()
|
|
|
|
|
|
def update_replication_state(conn, table, seq, date):
|
|
""" Update sequence and date in the replication state table.
|
|
The table is assumed to exist.
|
|
"""
|
|
with conn.cursor() as cur:
|
|
if date is not None:
|
|
cur.execute('UPDATE "{}" SET sequence=%s, importdate=%s'.format(table),
|
|
(seq, date))
|
|
else:
|
|
cur.execute('UPDATE "{}" SET sequence=%s'.format(table),
|
|
(seq,))
|
|
|
|
conn.commit()
|
|
|
|
def status(conn, args):
|
|
"""\
|
|
Print information about the current replication status, optionally as JSON.
|
|
|
|
Sample output:
|
|
|
|
2021-08-17 15:20:28 [INFO]: Using replication service 'https://planet.openstreetmap.org/replication/minute', which is at sequence 4675115 ( 2021-08-17T13:19:43Z )
|
|
2021-08-17 15:20:28 [INFO]: Replication server's most recent data is <1 minute old
|
|
2021-08-17 15:20:28 [INFO]: Local database is 8288 sequences behind the server, i.e. 5 day(s) 20 hour(s) 58 minute(s)
|
|
2021-08-17 15:20:28 [INFO]: Local database's most recent data is 5 day(s) 20 hour(s) 59 minute(s) old
|
|
|
|
|
|
With the '--json' option, the status is printed as a json object.
|
|
|
|
{
|
|
"server": {
|
|
"base_url": "https://planet.openstreetmap.org/replication/minute",
|
|
"sequence": 4675116,
|
|
"timestamp": "2021-08-17T13:20:43Z",
|
|
"age_sec": 27
|
|
},
|
|
"local": {
|
|
"sequence": 4666827,
|
|
"timestamp": "2021-08-11T16:21:09Z",
|
|
"age_sec": 507601
|
|
},
|
|
"status": 0
|
|
}
|
|
|
|
|
|
'status' is 0 if there were no problems getting the status. 1 & 2 for
|
|
improperly set up replication. 3 for network issues. If status ≠ 0, then
|
|
the 'error' key is an error message (as string). 'status' is used as the
|
|
exit code.
|
|
|
|
'server' is the replication server's current status. 'sequence' is it's
|
|
sequence number, 'timestamp' the time of that, and 'age_sec' the age of the
|
|
data in seconds.
|
|
|
|
'local' is the status of your server.
|
|
"""
|
|
|
|
results = {}
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute('SELECT * FROM pg_tables where tablename = %s', (args.table, ))
|
|
if cur.rowcount < 1:
|
|
results['status'] = 1
|
|
results['error'] = "Cannot find replication status table. Run 'osm2pgsql-replication init' first."
|
|
else:
|
|
cur.execute('SELECT * FROM "{}"'.format(args.table))
|
|
if cur.rowcount != 1:
|
|
results['status'] = 2
|
|
results['error'] = "Updates not set up correctly. Run 'osm2pgsql-updates init' first."
|
|
else:
|
|
|
|
base_url, db_seq, db_ts = cur.fetchone()
|
|
db_ts = db_ts.astimezone(dt.timezone.utc)
|
|
results['server'] = {}
|
|
results['local'] = {}
|
|
results['server']['base_url'] = base_url
|
|
results['local']['sequence'] = db_seq
|
|
results['local']['timestamp'] = db_ts.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
|
|
repl = ReplicationServer(base_url)
|
|
state_info = repl.get_state_info()
|
|
if state_info is None:
|
|
# PyOsmium was unable to download the state information
|
|
results['status'] = 3
|
|
results['error'] = "Unable to download the state information from {}".format(base_url)
|
|
else:
|
|
results['status'] = 0
|
|
now = dt.datetime.now(dt.timezone.utc)
|
|
|
|
server_seq, server_ts = state_info
|
|
server_ts = server_ts.astimezone(dt.timezone.utc)
|
|
|
|
results['server']['sequence'] = server_seq
|
|
results['server']['timestamp'] = server_ts.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
results['server']['age_sec'] = int((now-server_ts).total_seconds())
|
|
|
|
results['local']['age_sec'] = int((now - db_ts).total_seconds())
|
|
|
|
if args.json:
|
|
print(json.dumps(results))
|
|
else:
|
|
if results['status'] != 0:
|
|
LOG.fatal(results['error'])
|
|
else:
|
|
print("Using replication service '{}', which is at sequence {} ( {} )".format(
|
|
results['server']['base_url'], results['server']['sequence'], results['server']['timestamp']))
|
|
print("Replication server's most recent data is {} old".format(pretty_format_timedelta(results['server']['age_sec'])))
|
|
|
|
if results['local']['sequence'] == results['server']['sequence']:
|
|
print("Local database is up to date with server")
|
|
else:
|
|
print("Local database is {} sequences behind the server, i.e. {}".format(
|
|
results['server']['sequence'] - results['local']['sequence'],
|
|
pretty_format_timedelta(results['local']['age_sec'] - results['server']['age_sec'])
|
|
))
|
|
|
|
print("Local database's most recent data is {} old".format(pretty_format_timedelta(results['local']['age_sec'])))
|
|
|
|
|
|
return results['status']
|
|
|
|
|
|
def init(conn, args):
|
|
"""\
|
|
Initialise the replication process.
|
|
|
|
There are two ways to initialise the replication process: if you have imported
|
|
from a file that contains replication source information, then the
|
|
initialisation process can use this and set up replication from there.
|
|
Use the command '%(prog)s --osm-file <filename>' for this.
|
|
|
|
If the file has no replication information or you don't have the initial
|
|
import file anymore then replication can be set up according to
|
|
the data found in the database. It checks the planet_osm_way table for the
|
|
newest way in the database and then queries the OSM API when the way was
|
|
created. The date is used as the start date for replication. In this mode
|
|
the minutely diffs from the OSM servers are used as a source. You can change
|
|
this with the '--server' parameter.
|
|
"""
|
|
if args.osm_file is None:
|
|
date = compute_database_date(conn, args.prefix)
|
|
if date is None:
|
|
return 1
|
|
|
|
date = date - dt.timedelta(hours=3)
|
|
base_url = args.server
|
|
seq = None
|
|
else:
|
|
base_url, seq, date = get_replication_header(args.osm_file)
|
|
if base_url is None or (seq is None and date is None):
|
|
LOG.fatal("File '%s' has no usable replication headers. Use '--server' instead.")
|
|
return 1
|
|
|
|
repl = ReplicationServer(base_url)
|
|
if seq is None:
|
|
seq = repl.timestamp_to_sequence(date)
|
|
|
|
if seq is None:
|
|
LOG.fatal("Cannot reach the configured replication service '%s'.\n"
|
|
"Does the URL point to a directory containing OSM update data?",
|
|
base_url)
|
|
return 1
|
|
|
|
if date is None:
|
|
state = repl.get_state_info(seq)
|
|
if state is None:
|
|
LOG.fatal("Cannot reach the configured replication service '%s'.\n"
|
|
"Does the URL point to a directory containing OSM update data?",
|
|
base_url)
|
|
|
|
setup_replication_state(conn, args.table, base_url, seq, date)
|
|
|
|
LOG.info("Initialised updates for service '%s'.", base_url)
|
|
LOG.info("Starting at sequence %d (%s).", seq, date)
|
|
|
|
return 0
|
|
|
|
def update(conn, args):
|
|
"""\
|
|
Download newly available data and apply it to the database.
|
|
|
|
The data is downloaded in chunks of '--max-diff-size' MB. Each chunk is
|
|
saved in a temporary file and imported with osm2pgsql from there. The
|
|
temporary file is normally deleted afterwards unless you state an explicit
|
|
location with '--diff-file'. Once the database is up to date with the
|
|
replication source, the update process exits with 0.
|
|
|
|
Any additional arguments to osm2pgsql need to be given after '--'. Database
|
|
and the prefix parameter are handed through to osm2pgsql. They do not need
|
|
to be repeated. '--append' and '--slim' will always be added as well.
|
|
|
|
Use the '--post-processing' parameter to execute a script after osm2pgsql has
|
|
run successfully. If the updates consists of multiple runs because the
|
|
maximum size of downloaded data was reached, then the script is executed
|
|
each time that osm2pgsql has run. When the post-processing fails, then
|
|
the entire update run is considered a failure and the replication information
|
|
is not updated. That means that when 'update' is run the next time it will
|
|
recommence with downloading the diffs again and reapplying them to the
|
|
database. This is usually safe. The script receives two parameters:
|
|
the sequence ID and timestamp of the last successful run. The timestamp
|
|
may be missing in the rare case that the replication service stops responding
|
|
after the updates have been downloaded.
|
|
"""
|
|
with conn.cursor() as cur:
|
|
cur.execute('SELECT * FROM pg_tables where tablename = %s', (args.table, ))
|
|
if cur.rowcount < 1:
|
|
LOG.fatal("Cannot find replication status table. "
|
|
"Run 'osm2pgsql-replication init' first.")
|
|
return 1
|
|
|
|
cur.execute('SELECT * FROM "{}"'.format(args.table))
|
|
if cur.rowcount != 1:
|
|
LOG.fatal("Updates not set up correctly. Run 'osm2pgsql-updates init' first.")
|
|
return 1
|
|
|
|
base_url, seq, ts = cur.fetchone()
|
|
LOG.info("Using replication service '%s'. Current sequence %d (%s).",
|
|
base_url, seq, ts)
|
|
|
|
repl = ReplicationServer(base_url)
|
|
current = repl.get_state_info()
|
|
|
|
if seq >= current.sequence:
|
|
LOG.info("Database already up-to-date.")
|
|
return 0
|
|
|
|
if args.diff_file is not None:
|
|
outfile = Path(args.diff_file)
|
|
else:
|
|
tmpdir = tempfile.TemporaryDirectory()
|
|
outfile = Path(tmpdir.name) / 'osm2pgsql_diff.osc.gz'
|
|
|
|
osm2pgsql = [args.osm2pgsql_cmd, '--append', '--slim', '--prefix', args.prefix]
|
|
osm2pgsql.extend(args.extra_params)
|
|
if args.database:
|
|
osm2pgsql.extend(('-d', args.database))
|
|
if args.username:
|
|
osm2pgsql.extend(('-U', args.username))
|
|
if args.host:
|
|
osm2pgsql.extend(('-H', args.host))
|
|
if args.port:
|
|
osm2pgsql.extend(('-P', args.port))
|
|
osm2pgsql.append(str(outfile))
|
|
LOG.debug("Calling osm2pgsql with: %s", ' '.join(osm2pgsql))
|
|
|
|
while seq < current.sequence:
|
|
LOG.debug("Importing from sequence %d", seq)
|
|
if outfile.exists():
|
|
outfile.unlink()
|
|
outhandler = WriteHandler(str(outfile))
|
|
endseq = repl.apply_diffs(outhandler, seq + 1,
|
|
max_size=args.max_diff_size * 1024)
|
|
outhandler.close()
|
|
|
|
if endseq is None:
|
|
LOG.debug("No new diffs found.")
|
|
break
|
|
|
|
subprocess.run(osm2pgsql, check=True)
|
|
seq = endseq
|
|
|
|
nextstate = repl.get_state_info(seq)
|
|
timestamp = nextstate.timestamp if nextstate else None
|
|
|
|
if args.post_processing:
|
|
cmd = [args.post_processing, str(endseq), str(timestamp or '')]
|
|
LOG.debug('Calling post-processing script: %s', ' '.join(cmd))
|
|
subprocess.run(cmd, check=True)
|
|
|
|
update_replication_state(conn, args.table, seq,
|
|
nextstate.timestamp if nextstate else None)
|
|
|
|
if nextstate is not None:
|
|
LOG.info("Data imported until %s. Backlog remaining: %s",
|
|
nextstate.timestamp,
|
|
dt.datetime.now(dt.timezone.utc) - nextstate.timestamp)
|
|
|
|
if args.once:
|
|
break
|
|
|
|
return 0
|
|
|
|
def get_parser():
|
|
parser = ArgumentParser(description=__doc__,
|
|
prog='osm2pgsql-replication',
|
|
formatter_class=RawDescriptionHelpFormatter)
|
|
subs = parser.add_subparsers(title='available commands', dest='subcommand')
|
|
|
|
default_args = ArgumentParser(add_help=False)
|
|
group = default_args.add_argument_group('Default arguments')
|
|
group.add_argument('-h', '--help', action='help',
|
|
help='Show this help message and exit')
|
|
group.add_argument('-q', '--quiet', action='store_const', const=0,
|
|
dest='verbose', default=2,
|
|
help='Print only error messages')
|
|
group.add_argument('-v', '--verbose', action='count', default=2,
|
|
help='Increase verboseness of output')
|
|
group = default_args.add_argument_group('Database arguments')
|
|
group.add_argument('-d', '--database', metavar='DB',
|
|
help='Name of PostgreSQL database to connect to or conninfo string')
|
|
group.add_argument('-U', '--username', metavar='NAME',
|
|
help='PostgreSQL user name')
|
|
group.add_argument('-H', '--host', metavar='HOST',
|
|
help='Database server host name or socket location')
|
|
group.add_argument('-P', '--port', metavar='PORT',
|
|
help='Database server port')
|
|
group.add_argument('-p', '--prefix', metavar='PREFIX', default='planet_osm',
|
|
help="Prefix for table names (default 'planet_osm')")
|
|
|
|
# Arguments for init
|
|
cmd = subs.add_parser('init', parents=[default_args],
|
|
help=init.__doc__.split('\n', 1)[0],
|
|
description=dedent(init.__doc__),
|
|
formatter_class=RawDescriptionHelpFormatter,
|
|
add_help=False)
|
|
grp = cmd.add_argument_group('Replication source arguments')
|
|
srcgrp = grp.add_mutually_exclusive_group()
|
|
srcgrp.add_argument('--osm-file', metavar='FILE',
|
|
help='Get replication information from the given file.')
|
|
srcgrp.add_argument('--server', metavar='URL',
|
|
default='https://planet.openstreetmap.org/replication/minute',
|
|
help='Use replication server at the given URL (default: %(default)s)')
|
|
cmd.set_defaults(handler=init)
|
|
|
|
# Arguments for update
|
|
cmd = subs.add_parser('update', parents=[default_args],
|
|
usage='%(prog)s update [options] [-- param [param ...]]',
|
|
help=update.__doc__.split('\n', 1)[0],
|
|
description=dedent(update.__doc__),
|
|
formatter_class=RawDescriptionHelpFormatter,
|
|
add_help=False)
|
|
cmd.set_defaults(handler=update)
|
|
cmd.add_argument('extra_params', nargs='*', metavar='param',
|
|
help='Extra parameters to hand in to osm2pgsql.')
|
|
grp = cmd.add_argument_group('Update process arguments')
|
|
cmd.add_argument('--diff-file', metavar='FILE',
|
|
help='File to save changes before they are applied to osm2pgsql.')
|
|
cmd.add_argument('--max-diff-size', type=int, default=500,
|
|
help='Maximum data to load in MB (default: 500MB)')
|
|
cmd.add_argument('--osm2pgsql-cmd', default='osm2pgsql',
|
|
help='Path to osm2pgsql command (default: osm2pgsql)')
|
|
cmd.add_argument('--once', action='store_true',
|
|
help='Run updates only once, even when more data is available.')
|
|
cmd.add_argument('--post-processing', metavar='SCRIPT',
|
|
help='Post-processing script to run after each execution of osm2pgsql.')
|
|
|
|
# Arguments for status
|
|
cmd = subs.add_parser('status', parents=[default_args],
|
|
help=status.__doc__.split('\n', 1)[0],
|
|
description=dedent(status.__doc__),
|
|
formatter_class=RawDescriptionHelpFormatter,
|
|
add_help=False)
|
|
cmd.add_argument('--json', action="store_true", default=False, help="Output status as json.")
|
|
cmd.set_defaults(handler=status)
|
|
|
|
|
|
return parser
|
|
|
|
def main():
|
|
parser = get_parser()
|
|
args = parser.parse_args()
|
|
|
|
if args.subcommand is None:
|
|
parser.print_help()
|
|
exit(1)
|
|
|
|
logging.basicConfig(stream=sys.stderr,
|
|
format='{asctime} [{levelname}]: {message}',
|
|
style='{',
|
|
datefmt='%Y-%m-%d %H:%M:%S',
|
|
level=max(4 - args.verbose, 1) * 10)
|
|
|
|
if '"' in args.prefix:
|
|
LOG.fatal("Prefix must not contain quotation marks.")
|
|
return 1
|
|
|
|
args.table = '{}_replication_status'.format(args.prefix)
|
|
|
|
conn = connect(args)
|
|
ret = args.handler(conn, args)
|
|
conn.close()
|
|
|
|
return ret
|
|
|
|
|
|
if __name__ == '__main__':
|
|
exit(main())
|