mirror of
https://github.com/osm2pgsql-dev/osm2pgsql.git
synced 2025-07-23 00:48:47 +00:00
751 lines
30 KiB
Python
Executable File
751 lines
30 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
|
||
# SPDX-License-Identifier: GPL-2.0-or-later
|
||
#
|
||
# This file is part of osm2pgsql (https://osm2pgsql.org/).
|
||
#
|
||
# Copyright (C) 2006-2025 by the osm2pgsql developer community.
|
||
# For a full list of authors see the git log.
|
||
|
||
"""
|
||
Update an osm2pgsql database with changes from an OSM replication server.
|
||
|
||
This tool initialises the updating process by looking at the import file
|
||
or the newest object in the database. The state is then saved in a table
|
||
in the database. Subsequent runs download newly available data and apply
|
||
it to the database.
|
||
|
||
See the help of the `init` and `update` command for more information on
|
||
how to use %(prog)s.
|
||
"""
|
||
|
||
from argparse import ArgumentParser, RawDescriptionHelpFormatter
|
||
import datetime as dt
|
||
import json
|
||
import logging
|
||
import sys
|
||
import subprocess
|
||
import tempfile
|
||
from textwrap import dedent
|
||
from pathlib import Path
|
||
import urllib.request as urlrequest
|
||
|
||
missing_modules = []
|
||
|
||
try:
|
||
import psycopg2 as psycopg
|
||
from psycopg2 import sql
|
||
except ImportError:
|
||
try:
|
||
import psycopg
|
||
from psycopg import sql
|
||
except ImportError:
|
||
missing_modules.append('psycopg2')
|
||
|
||
try:
|
||
from osmium.replication.server import ReplicationServer
|
||
from osmium.replication.utils import get_replication_header
|
||
from osmium.replication import newest_change_from_file
|
||
from osmium import WriteHandler
|
||
except ImportError:
|
||
missing_modules.append('osmium')
|
||
|
||
LOG = logging.getLogger()
|
||
|
||
OSM2PGSQL_PATH = Path(__file__).parent.resolve() / 'osm2pgsql'
|
||
|
||
def pretty_format_timedelta(seconds):
|
||
seconds = int(seconds)
|
||
(minutes, seconds) = divmod(seconds, 60)
|
||
(hours, minutes) = divmod(minutes, 60)
|
||
(days, hours) = divmod(hours, 24)
|
||
(weeks, days) = divmod(days, 7)
|
||
|
||
output = []
|
||
# If weeks > 1 but hours == 0, we still want to show "0 hours"
|
||
if weeks > 0:
|
||
output.append("{} week(s)".format(weeks))
|
||
if days > 0 or weeks > 0:
|
||
output.append("{} day(s)".format(days))
|
||
if hours > 0 or days > 0 or weeks > 0:
|
||
output.append("{} hour(s)".format(hours))
|
||
if minutes > 0 or hours > 0 or days > 0 or weeks > 0:
|
||
output.append("{} minute(s)".format(minutes))
|
||
|
||
output.append("{} second(s)".format(seconds))
|
||
|
||
output = " ".join(output)
|
||
return output
|
||
|
||
|
||
def osm_date(date):
|
||
return date.strftime('%Y-%m-%dT%H:%M:%SZ')
|
||
|
||
|
||
def from_osm_date(datestr):
|
||
return dt.datetime.strptime(datestr, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=dt.timezone.utc)
|
||
|
||
|
||
def start_point(param):
|
||
if param.isdigit():
|
||
return int(param)
|
||
|
||
if sys.version_info >= (3, 7):
|
||
try:
|
||
date = dt.datetime.fromisoformat(param)
|
||
if date.tzinfo is None:
|
||
date = date.replace(tzinfo=dt.timezone.utc)
|
||
return date
|
||
except ValueError:
|
||
pass
|
||
|
||
return from_osm_date(param)
|
||
|
||
|
||
class DBError(Exception):
|
||
|
||
def __init__(self, errno, msg):
|
||
self.errno = errno
|
||
self.msg = msg
|
||
|
||
|
||
class DBConnection:
|
||
|
||
def __init__(self, schema, args):
|
||
self.schema = schema
|
||
|
||
# If dbname looks like a conninfo string use it as such
|
||
if args.database and any(part in args.database for part in ['=', '://']):
|
||
self.conn = psycopg.connect(args.database,
|
||
fallback_application_name="osm2pgsql-replication")
|
||
else:
|
||
self.conn = psycopg.connect(dbname=args.database, user=args.username,
|
||
host=args.host, port=args.port,
|
||
fallback_application_name="osm2pgsql-replication")
|
||
|
||
self.name = self.conn.info.dbname
|
||
|
||
def __enter__(self):
|
||
return self
|
||
|
||
def __exit__(self, exc_type, exc_value, traceback):
|
||
if self.conn is not None:
|
||
self.conn.close()
|
||
|
||
def table_exists(self, table_name):
|
||
with self.conn.cursor() as cur:
|
||
cur.execute('SELECT * FROM pg_tables WHERE tablename = %s and schemaname = %s ',
|
||
(table_name, self.schema))
|
||
return cur.rowcount > 0
|
||
|
||
def table_id(self, name):
|
||
return sql.Identifier(self.schema, name)
|
||
|
||
|
||
class Osm2pgsqlProperties:
|
||
PROP_TABLE_NAME = 'osm2pgsql_properties'
|
||
|
||
def __init__(self, db):
|
||
self.db = db
|
||
self.is_updatable = self._get_prop('updatable') == 'true'
|
||
|
||
def _get_prop(self, name):
|
||
with self.db.conn.cursor() as cur:
|
||
cur.execute(sql.SQL("SELECT value FROM {} WHERE property = %s")
|
||
.format(self.db.table_id(self.PROP_TABLE_NAME)),
|
||
(name, ))
|
||
return cur.fetchone()[0] if cur.rowcount == 1 else None
|
||
|
||
def _set_prop(self, name, value):
|
||
with self.db.conn.cursor() as cur:
|
||
cur.execute(sql.SQL("""INSERT INTO {} (property, value) VALUES (%s, %s)
|
||
ON CONFLICT (property)
|
||
DO UPDATE SET value = EXCLUDED.value""")
|
||
.format(self.db.table_id(self.PROP_TABLE_NAME)),
|
||
(name, value))
|
||
|
||
def get_replication_base(self, server, start_at):
|
||
seq, date = None, None
|
||
if server is None:
|
||
server = self._get_prop('replication_base_url')
|
||
if server:
|
||
seq = self._get_prop('replication_sequence_number')
|
||
date = self._get_prop('replication_timestamp')
|
||
if date is not None:
|
||
date = from_osm_date(date)
|
||
else:
|
||
server = 'https://planet.openstreetmap.org/replication/minute'
|
||
|
||
if isinstance(start_at, dt.datetime):
|
||
return server, None, start_at
|
||
|
||
if seq is None or isinstance(start_at, int):
|
||
date = self._get_prop('current_timestamp')
|
||
if date is None:
|
||
raise DBError(1, "Cannot get timestamp from database. "
|
||
"Use --start-at to set an explicit date.")
|
||
|
||
date = from_osm_date(date)
|
||
date -= dt.timedelta(minutes=start_at or 180)
|
||
seq = None
|
||
else:
|
||
seq = int(seq)
|
||
|
||
return server, seq, date
|
||
|
||
def get_replication_state(self):
|
||
if not self.db.table_exists(self.PROP_TABLE_NAME):
|
||
raise DBError(1, "Cannot find replication status table. Run 'osm2pgsql-replication init' first.")
|
||
|
||
base_url = self._get_prop('replication_base_url')
|
||
seq = self._get_prop('replication_sequence_number')
|
||
date = self._get_prop('replication_timestamp')
|
||
|
||
if base_url is None or seq is None or date is None:
|
||
raise DBError(2, "Updates not set up correctly. Run 'osm2pgsql-replication init' first.")
|
||
|
||
return base_url, int(seq), from_osm_date(date)
|
||
|
||
def write_replication_state(self, base_url, seq, date):
|
||
self._set_prop('replication_base_url', base_url)
|
||
self._set_prop('replication_sequence_number', seq)
|
||
if date is not None:
|
||
self._set_prop('replication_timestamp', osm_date(date))
|
||
self.db.conn.commit()
|
||
|
||
|
||
class LegacyProperties:
|
||
|
||
def __init__(self, db, prefix):
|
||
self.db = db
|
||
self.prop_table = f'{prefix}_replication_status'
|
||
self.way_table = f'{prefix}_ways'
|
||
self.is_updatable = db.table_exists(self.way_table)
|
||
|
||
def get_replication_base(self, server, start_at):
|
||
""" Determine the date of the database from the newest object in the
|
||
database.
|
||
"""
|
||
if server is None:
|
||
server = 'https://planet.openstreetmap.org/replication/minute'
|
||
|
||
if isinstance(start_at, dt.datetime):
|
||
return server, None, start_at
|
||
|
||
# First, find the way with the highest ID in the database
|
||
# Using nodes would be more reliable but those are not cached by osm2pgsql.
|
||
with self.db.conn.cursor() as cur:
|
||
cur.execute(sql.SQL("SELECT max(id) FROM {}")
|
||
.format(self.db.table_id(self.way_table)))
|
||
osmid = cur.fetchone()[0] if cur.rowcount == 1 else None
|
||
|
||
if osmid is None:
|
||
raise DBError(1, "No data found in the database.")
|
||
|
||
LOG.debug("Using way id %d for timestamp lookup", osmid)
|
||
# Get the way from the API to find the timestamp when it was created.
|
||
url = 'https://www.openstreetmap.org/api/0.6/way/{}/1'.format(osmid)
|
||
headers = {"User-Agent" : "osm2pgsql-replication",
|
||
"Accept" : "application/json"}
|
||
with urlrequest.urlopen(urlrequest.Request(url, headers=headers)) as response:
|
||
data = json.loads(response.read().decode('utf-8'))
|
||
|
||
if not data.get('elements') or not 'timestamp' in data['elements'][0]:
|
||
raise DBError(1, "The way data downloaded from the API does not contain valid data.\n"
|
||
f"URL used: {url}")
|
||
|
||
date = data['elements'][0]['timestamp']
|
||
LOG.debug("Found timestamp %s", date)
|
||
|
||
try:
|
||
date = from_osm_date(date)
|
||
except ValueError:
|
||
raise DBError(1, f"Cannot parse timestamp '{date}'")
|
||
|
||
date -= dt.timedelta(minutes=start_at or 180)
|
||
|
||
return server, None, date
|
||
|
||
def get_replication_state(self):
|
||
if not self.db.table_exists(self.prop_table):
|
||
raise DBError(1, "Cannot find replication status table. Run 'osm2pgsql-replication init' first.")
|
||
|
||
with self.db.conn.cursor() as cur:
|
||
cur.execute(sql.SQL('SELECT * FROM {}').format(self.db.table_id(self.prop_table)))
|
||
if cur.rowcount != 1:
|
||
raise DBError(2, "Updates not set up correctly. Run 'osm2pgsql-replication init' first.")
|
||
|
||
base_url, seq, date = cur.fetchone()
|
||
|
||
if base_url is None or seq is None or date is None:
|
||
raise DBError(2, "Updates not set up correctly. Run 'osm2pgsql-replication init' first.")
|
||
|
||
return base_url, seq, date
|
||
|
||
def write_replication_state(self, base_url, seq, date):
|
||
table = self.db.table_id(self.prop_table)
|
||
with self.db.conn.cursor() as cur:
|
||
if not self.db.table_exists(self.prop_table):
|
||
cur.execute(sql.SQL("""CREATE TABLE {}
|
||
(url TEXT,
|
||
sequence INTEGER,
|
||
importdate TIMESTAMP WITH TIME ZONE)
|
||
""").format(table))
|
||
cur.execute(sql.SQL('TRUNCATE {}').format(table))
|
||
if date:
|
||
cur.execute(sql.SQL('INSERT INTO {} VALUES(%s, %s, %s)').format(table),
|
||
(base_url, seq, date))
|
||
else:
|
||
cur.execute(sql.SQL('INSERT INTO {} VALUES(%s, %s)').format(table),
|
||
(base_url, seq))
|
||
self.db.conn.commit()
|
||
|
||
|
||
def get_status_info(props, args):
|
||
results = {'status': 0}
|
||
|
||
base_url, db_seq, db_ts = props.get_replication_state()
|
||
|
||
db_ts = db_ts.astimezone(dt.timezone.utc)
|
||
results['server'] = {'base_url': base_url}
|
||
results['local'] = {'sequence': db_seq, 'timestamp': osm_date(db_ts)}
|
||
|
||
repl = ReplicationServer(base_url)
|
||
state_info = repl.get_state_info()
|
||
if state_info is None:
|
||
# PyOsmium was unable to download the state information
|
||
results['status'] = 3
|
||
results['error'] = "Unable to download the state information from {}".format(base_url)
|
||
else:
|
||
results['status'] = 0
|
||
now = dt.datetime.now(dt.timezone.utc)
|
||
|
||
server_seq, server_ts = state_info
|
||
server_ts = server_ts.astimezone(dt.timezone.utc)
|
||
|
||
results['server']['sequence'] = server_seq
|
||
results['server']['timestamp'] = osm_date(server_ts)
|
||
results['server']['age_sec'] = int((now-server_ts).total_seconds())
|
||
|
||
results['local']['age_sec'] = int((now - db_ts).total_seconds())
|
||
|
||
return results
|
||
|
||
|
||
def status(props, args):
|
||
"""\
|
||
Print information about the current replication status, optionally as JSON.
|
||
|
||
Sample output:
|
||
|
||
2021-08-17 15:20:28 [INFO]: Using replication service 'https://planet.openstreetmap.org/replication/minute', which is at sequence 4675115 ( 2021-08-17T13:19:43Z )
|
||
2021-08-17 15:20:28 [INFO]: Replication server's most recent data is <1 minute old
|
||
2021-08-17 15:20:28 [INFO]: Local database is 8288 sequences behind the server, i.e. 5 day(s) 20 hour(s) 58 minute(s)
|
||
2021-08-17 15:20:28 [INFO]: Local database's most recent data is 5 day(s) 20 hour(s) 59 minute(s) old
|
||
|
||
|
||
With the `--json` option, the status is printed as a json object.
|
||
|
||
{
|
||
"server": {
|
||
"base_url": "https://planet.openstreetmap.org/replication/minute",
|
||
"sequence": 4675116,
|
||
"timestamp": "2021-08-17T13:20:43Z",
|
||
"age_sec": 27
|
||
},
|
||
"local": {
|
||
"sequence": 4666827,
|
||
"timestamp": "2021-08-11T16:21:09Z",
|
||
"age_sec": 507601
|
||
},
|
||
"status": 0
|
||
}
|
||
|
||
|
||
`status` is 0 if there were no problems getting the status. 1 & 2 for
|
||
improperly set up replication. 3 for network issues. If status is greater 0,
|
||
then the `error` key is an error message (as string). `status` is used as
|
||
the exit code.
|
||
|
||
`server` is the replication server's current status. `sequence` is its
|
||
sequence number, `timestamp` the time of that, and 'age_sec' the age of the
|
||
data in seconds.
|
||
|
||
`local` is the status of your server.
|
||
"""
|
||
try:
|
||
results = get_status_info(props, args)
|
||
except DBError as err:
|
||
results = {'status': err.errno, 'error': err.msg}
|
||
|
||
if args.json:
|
||
print(json.dumps(results))
|
||
else:
|
||
if results['status'] != 0:
|
||
LOG.fatal(results['error'])
|
||
else:
|
||
print("Using replication service '{}', which is at sequence {} ( {} )".format(
|
||
results['server']['base_url'], results['server']['sequence'], results['server']['timestamp']))
|
||
print("Replication server's most recent data is {} old".format(pretty_format_timedelta(results['server']['age_sec'])))
|
||
|
||
if results['local']['sequence'] == results['server']['sequence']:
|
||
print("Local database is up to date with server")
|
||
else:
|
||
print("Local database is {} sequences behind the server, i.e. {}".format(
|
||
results['server']['sequence'] - results['local']['sequence'],
|
||
pretty_format_timedelta(results['local']['age_sec'] - results['server']['age_sec'])
|
||
))
|
||
|
||
print("Local database's most recent data is {} old".format(pretty_format_timedelta(results['local']['age_sec'])))
|
||
|
||
return results['status']
|
||
|
||
|
||
def init(props, args):
|
||
"""\
|
||
Initialise the replication process.
|
||
|
||
This function sets the replication service to use and determines from
|
||
which date to apply updates. You must call this function at least once
|
||
to set up the replication process. It can safely be called again later
|
||
to change the replication servers or to roll back the update process and
|
||
reapply updates.
|
||
|
||
There are different methods available for initialisation. When no
|
||
additional parameters are given, the data is initialised from the data
|
||
in the database. If the data was imported from a file with replication
|
||
information and the properties table is available (for osm2pgsql >= 1.9)
|
||
then the replication from the file is used. Otherwise the minutely
|
||
update service from openstreetmap.org is used as the default replication
|
||
service. The start date is either taken from the database timestamp
|
||
(for osm2pgsql >= 1.9) or determined from the newest way in the database
|
||
by querying the OSM API about its creation date.
|
||
|
||
The replication service can be changed with the `--server` parameter.
|
||
To use a different start date, add `--start-at` with an absolute
|
||
ISO timestamp (e.g. 2007-08-20T12:21:53Z). When the program determines the
|
||
start date from the database timestamp or way creation date, then it
|
||
subtracts another 3 hours by default to ensure that all new changes are
|
||
available. To change this rollback period, use `--start-at` with the
|
||
number of minutes to rollback. This rollback mode can also be used to
|
||
force initialisation to use the database date and ignore the date
|
||
from the replication information in the file.
|
||
|
||
The initialisation process can also use replication information from
|
||
an OSM file directly and ignore all other date information.
|
||
Use the command `%(prog)s --osm-file <filename>` for this.
|
||
"""
|
||
if args.osm_file is None:
|
||
base_url, seq, date = props.get_replication_base(args.server, args.start_at)
|
||
else:
|
||
base_url, seq, date = get_replication_header(args.osm_file)
|
||
if base_url is None or (seq is None and date is None):
|
||
raise DBError(1, f"File '{args.osm_file}' has no usable replication headers. Use '--server' instead.")
|
||
|
||
repl = ReplicationServer(base_url)
|
||
if seq is None:
|
||
seq = repl.timestamp_to_sequence(date)
|
||
if seq is None:
|
||
raise DBError(1, f"Cannot reach the configured replication service '{base_url}'.\n"
|
||
"Does the URL point to a directory containing OSM update data?")
|
||
|
||
state = repl.get_state_info(seq)
|
||
if state is None:
|
||
raise DBError(1, f"Cannot load state information for {seq} from replication service {base_url}.\n"
|
||
+ (f"The server may not have diffs going as far back as {date}."
|
||
if date is not None else
|
||
"Does the URL point to a directory containing OSM update data?"))
|
||
|
||
if date is None:
|
||
date = from_osm_date(state.timestamp)
|
||
else:
|
||
# Sanity check. Is the sequence in line with the date requested?
|
||
if state.timestamp > date:
|
||
raise DBError(1, "The replication service does not have diff files for the requested date.\n"
|
||
f"Replication service used: {base_url}\n"
|
||
f"Most recent diffs available start at: {state.timestamp}\n"
|
||
f"Database date: {date}")
|
||
|
||
props.write_replication_state(base_url, seq, date)
|
||
|
||
LOG.info("Initialised updates for service '%s'.", base_url)
|
||
LOG.info("Starting at sequence %d (%s).", seq, osm_date(date))
|
||
|
||
return 0
|
||
|
||
|
||
def update(props, args):
|
||
"""\
|
||
Download newly available data and apply it to the database.
|
||
|
||
The data is downloaded in chunks of `--max-diff-size` MB. Each chunk is
|
||
saved in a temporary file and imported with osm2pgsql from there. The
|
||
temporary file is normally deleted afterwards unless you state an explicit
|
||
location with `--diff-file`. Once the database is up to date with the
|
||
replication source, the update process exits with 0.
|
||
|
||
Any additional arguments to osm2pgsql need to be given after `--`. Database
|
||
and the prefix parameter are handed through to osm2pgsql. They do not need
|
||
to be repeated. `--append` and `--slim` will always be added as well.
|
||
|
||
Use the `--post-processing` parameter to execute a script after osm2pgsql has
|
||
run successfully. If the updates consists of multiple runs because the
|
||
maximum size of downloaded data was reached, then the script is executed
|
||
each time that osm2pgsql has run. When the post-processing fails, then
|
||
the entire update run is considered a failure and the replication information
|
||
is not updated. That means that when 'update' is run the next time it will
|
||
recommence with downloading the diffs again and reapplying them to the
|
||
database. This is usually safe. The script receives two parameters:
|
||
the sequence ID and timestamp of the last successful run. The timestamp
|
||
may be missing in the rare case that the replication service stops responding
|
||
after the updates have been downloaded.
|
||
"""
|
||
base_url, seq, ts = props.get_replication_state()
|
||
|
||
initial_local_timestamp = ts
|
||
LOG.info("Using replication service '%s'.", base_url)
|
||
local_db_age_sec = int((dt.datetime.now(dt.timezone.utc) - ts).total_seconds())
|
||
|
||
repl = ReplicationServer(base_url)
|
||
current = repl.get_state_info()
|
||
if current is None:
|
||
raise DBError(1, f"Cannot reach the configured replication service '{base_url}'.\n"
|
||
"Does the URL point to a directory containing OSM update data?")
|
||
|
||
if seq >= current.sequence:
|
||
LOG.info("Database already up-to-date.")
|
||
return 0
|
||
|
||
remote_server_age_sec = int((dt.datetime.now(dt.timezone.utc) - current.timestamp).total_seconds())
|
||
LOG.debug("Need to apply %d sequence(s) (%d → %d), covering %s (%s sec) of changes (%s → %s)",
|
||
current.sequence - seq, seq, current.sequence,
|
||
pretty_format_timedelta((current.timestamp - ts).total_seconds()),
|
||
int((current.timestamp - ts).total_seconds()),
|
||
osm_date(ts.astimezone(dt.timezone.utc)),
|
||
osm_date(current.timestamp.astimezone(dt.timezone.utc))
|
||
)
|
||
|
||
update_started = dt.datetime.now(dt.timezone.utc)
|
||
|
||
if args.diff_file is not None:
|
||
outfile = Path(args.diff_file)
|
||
else:
|
||
tmpdir = tempfile.TemporaryDirectory()
|
||
outfile = Path(tmpdir.name) / 'osm2pgsql_diff.osc.gz'
|
||
|
||
osm2pgsql = [args.osm2pgsql_cmd, '--append', '--slim', '--prefix', args.prefix]
|
||
osm2pgsql.extend(args.extra_params)
|
||
if args.middle_schema:
|
||
osm2pgsql.extend(('--middle-schema', args.middle_schema))
|
||
if args.schema:
|
||
osm2pgsql.extend(('--schema', args.schema))
|
||
if args.database:
|
||
osm2pgsql.extend(('-d', args.database))
|
||
if args.username:
|
||
osm2pgsql.extend(('-U', args.username))
|
||
if args.host:
|
||
osm2pgsql.extend(('-H', args.host))
|
||
if args.port:
|
||
osm2pgsql.extend(('-P', args.port))
|
||
osm2pgsql.append(str(outfile))
|
||
LOG.debug("Calling osm2pgsql with: %s", ' '.join(osm2pgsql))
|
||
|
||
while seq < current.sequence:
|
||
LOG.debug("Importing from sequence %d", seq)
|
||
if outfile.exists():
|
||
outfile.unlink()
|
||
outhandler = WriteHandler(str(outfile))
|
||
endseq = repl.apply_diffs(outhandler, seq + 1,
|
||
max_size=args.max_diff_size * 1024)
|
||
outhandler.close()
|
||
|
||
if endseq is None:
|
||
LOG.debug("No new diffs found.")
|
||
break
|
||
|
||
subprocess.run(osm2pgsql, check=True)
|
||
seq = endseq
|
||
|
||
nextstate = repl.get_state_info(seq)
|
||
if nextstate:
|
||
timestamp = nextstate.timestamp
|
||
else:
|
||
# Can't get state information for some reason, get the timestamp from file.
|
||
timestamp = newest_change_from_file(str(outfile))
|
||
if timestamp <= dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc):
|
||
timestamp = None
|
||
|
||
if args.post_processing:
|
||
cmd = [args.post_processing, str(endseq), str(timestamp or '')]
|
||
LOG.debug('Calling post-processing script: %s', ' '.join(cmd))
|
||
subprocess.run(cmd, check=True)
|
||
|
||
props.write_replication_state(base_url, seq, timestamp)
|
||
|
||
if timestamp is not None:
|
||
LOG.info("Data imported until %s. Backlog remaining: %s",
|
||
osm_date(timestamp.astimezone(dt.timezone.utc)),
|
||
pretty_format_timedelta((dt.datetime.now(dt.timezone.utc) - timestamp).total_seconds()),
|
||
)
|
||
|
||
if args.once:
|
||
break
|
||
|
||
update_duration_sec = (dt.datetime.now(dt.timezone.utc) - update_started).total_seconds()
|
||
_base_url, _seq, current_local_timestamp = props.get_replication_state()
|
||
|
||
total_applied_changes_duration_sec = (current_local_timestamp - initial_local_timestamp).total_seconds()
|
||
LOG.debug("It took %s (%d sec) to apply %s (%d sec) of changes. This is a speed of ×%.1f.",
|
||
pretty_format_timedelta(update_duration_sec), int(update_duration_sec),
|
||
pretty_format_timedelta(total_applied_changes_duration_sec), int(total_applied_changes_duration_sec),
|
||
total_applied_changes_duration_sec / update_duration_sec
|
||
)
|
||
|
||
return 0
|
||
|
||
|
||
def get_parser():
|
||
parser = ArgumentParser(description=__doc__,
|
||
prog='osm2pgsql-replication',
|
||
formatter_class=RawDescriptionHelpFormatter)
|
||
subs = parser.add_subparsers(title='available commands', dest='subcommand')
|
||
|
||
default_args = ArgumentParser(add_help=False)
|
||
group = default_args.add_argument_group('Default arguments')
|
||
group.add_argument('-h', '--help', action='help',
|
||
help='Show this help message and exit')
|
||
group.add_argument('-q', '--quiet', action='store_const', const=0,
|
||
dest='verbose', default=2,
|
||
help='Print only error messages')
|
||
group.add_argument('-v', '--verbose', action='count', default=2,
|
||
help='Increase verboseness of output')
|
||
group = default_args.add_argument_group('Database arguments',
|
||
"The following arguments can be used to set the connection parameters to the\n"
|
||
"osm2pgsql database. You may also use libpq environment variables to set\n"
|
||
"connection parameters, see https://www.postgresql.org/docs/current/libpq-envars.html.\n"
|
||
"If your database connection requires a password, use a pgpass file,\n"
|
||
"see https://www.postgresql.org/docs/current/libpq-pgpass.html.")
|
||
group.add_argument('-d', '--database', metavar='DB',
|
||
help='Name of PostgreSQL database to connect to or conninfo string')
|
||
group.add_argument('-U', '--username', '--user', metavar='NAME',
|
||
help='PostgreSQL user name')
|
||
group.add_argument('-H', '--host', metavar='HOST',
|
||
help='Database server host name or socket location')
|
||
group.add_argument('-P', '--port', metavar='PORT',
|
||
help='Database server port')
|
||
group.add_argument('-p', '--prefix', metavar='PREFIX', default='planet_osm',
|
||
help="Prefix for table names (default 'planet_osm')")
|
||
group.add_argument('--middle-schema', metavar='SCHEMA', default=None,
|
||
help='Name of the schema to store the table for the replication state in')
|
||
group.add_argument('--schema', metavar='SCHEMA', default=None,
|
||
help='Name of the schema for the database')
|
||
|
||
# Arguments for init
|
||
cmd = subs.add_parser('init', parents=[default_args],
|
||
help=init.__doc__.split('\n', 1)[0],
|
||
description=dedent(init.__doc__),
|
||
formatter_class=RawDescriptionHelpFormatter,
|
||
add_help=False)
|
||
grp = cmd.add_argument_group('Replication source arguments')
|
||
srcgrp = grp.add_mutually_exclusive_group()
|
||
srcgrp.add_argument('--osm-file', metavar='FILE',
|
||
help='Get replication information from the given file.')
|
||
srcgrp.add_argument('--server', metavar='URL',
|
||
help='Use replication server at the given URL')
|
||
grp.add_argument('--start-at', metavar='TIME', type=start_point,
|
||
help='Time when to start replication. When an absolute timestamp '
|
||
'(in ISO format) is given, it will be used. If a number '
|
||
'is given, then replication starts the number of minutes '
|
||
'before the known date of the database.')
|
||
|
||
cmd.set_defaults(handler=init)
|
||
|
||
# Arguments for update
|
||
cmd = subs.add_parser('update', parents=[default_args],
|
||
usage='%(prog)s update [options] [-- param [param ...]]',
|
||
help=update.__doc__.split('\n', 1)[0],
|
||
description=dedent(update.__doc__),
|
||
formatter_class=RawDescriptionHelpFormatter,
|
||
add_help=False)
|
||
cmd.set_defaults(handler=update)
|
||
cmd.add_argument('extra_params', nargs='*', metavar='param',
|
||
help='Extra parameters to hand in to osm2pgsql.')
|
||
grp = cmd.add_argument_group('Update process arguments')
|
||
cmd.add_argument('--diff-file', metavar='FILE',
|
||
help='File to save changes before they are applied to osm2pgsql.')
|
||
cmd.add_argument('--max-diff-size', type=int, default=500,
|
||
help='Maximum data to load in MB (default: 500MB)')
|
||
cmd.add_argument('--osm2pgsql-cmd', default=str(OSM2PGSQL_PATH),
|
||
help=f'Path to osm2pgsql command')
|
||
cmd.add_argument('--once', action='store_true',
|
||
help='Run updates only once, even when more data is available.')
|
||
cmd.add_argument('--post-processing', metavar='SCRIPT',
|
||
help='Post-processing script to run after each execution of osm2pgsql.')
|
||
|
||
# Arguments for status
|
||
cmd = subs.add_parser('status', parents=[default_args],
|
||
help=status.__doc__.split('\n', 1)[0],
|
||
description=dedent(status.__doc__),
|
||
formatter_class=RawDescriptionHelpFormatter,
|
||
add_help=False)
|
||
cmd.add_argument('--json', action="store_true", default=False, help="Output status as json.")
|
||
cmd.set_defaults(handler=status)
|
||
|
||
return parser
|
||
|
||
|
||
def main(prog_args=None):
|
||
parser = get_parser()
|
||
try:
|
||
args = parser.parse_args(args=prog_args)
|
||
except SystemExit:
|
||
return 1
|
||
|
||
if missing_modules:
|
||
LOG.fatal("Missing required Python libraries %(mods)s.\n\n"
|
||
"To install them via pip run: pip install %(mods)s\n\n"
|
||
"ERROR: libraries could not be loaded.",
|
||
dict(mods=" ".join(missing_modules)))
|
||
return 1
|
||
|
||
|
||
if args.subcommand is None:
|
||
parser.print_help()
|
||
return 1
|
||
|
||
logging.basicConfig(stream=sys.stderr,
|
||
format='{asctime} [{levelname}]: {message}',
|
||
style='{',
|
||
datefmt='%Y-%m-%d %H:%M:%S',
|
||
level=max(4 - args.verbose, 1) * 10)
|
||
|
||
prop_table_schema = args.middle_schema or args.schema or 'public'
|
||
|
||
with DBConnection(prop_table_schema, args) as db:
|
||
if db.table_exists(Osm2pgsqlProperties.PROP_TABLE_NAME):
|
||
props = Osm2pgsqlProperties(db)
|
||
else:
|
||
props = LegacyProperties(db, args.prefix)
|
||
|
||
if not props.is_updatable:
|
||
LOG.fatal(f'osm2pgsql middle table "{prop_table_schema}.{args.prefix}_ways" not found in database "{db.name}". '
|
||
'Database needs to be imported in --slim mode.')
|
||
return 1
|
||
|
||
try:
|
||
return args.handler(props, args)
|
||
except DBError as err:
|
||
LOG.fatal(err.msg)
|
||
|
||
return 1
|
||
|
||
|
||
if __name__ == '__main__':
|
||
try:
|
||
retcode = main()
|
||
except Exception as ex:
|
||
LOG.fatal("Exception during execution: %s", ex)
|
||
retcode = 3
|
||
|
||
sys.exit(retcode)
|