Zur Synchronisation eines Starface Adressbuches mit einem CRM System ist ein Zugriff auf die SQL Datenbank des CRM erforderlich.
Hierzu gibt es kostenpflichtige Module, die einige Nachteile mit sich bringen:
- Sie werden i.d.R. mit jedem großen Starface Update zerschossen und erfordern manuelle Nacharbeiten bei jedem größeren Starface Update
- Der Preis: hohe, monatliche Beträge, die mit der Anzahl der Kontakte skalieren sind genauso vertreten wie Module mit dreistelligem Einmalbetrag zzgl. jährlicher Wartungsgebühren.
- Sie funktionieren nicht mit einer kostenfreien Starface Free Lizenz
Das folgende Python Skript übernimmt die Synchronisation. Es läuft direkt auf dem Linux der Starface und benötigt Netzwerkzugriff auf die SQL Datenbank. Das funktioniert i.d.R. sehr lange Updatestabil.
Hinweise:
- Die Synchronisation funktioniert ebenfalls mit Carddav-Quellen wie Nextcloud/Owncload oder einem Synology NAS.
- Der Carddav Adressbuch-Link muss der vollständige Link zum Adressbuch sein, nicht nur der zur Haupt-URL der Nextcloud Instanz wie im Quellcode ggf. suggeriert wird. Ggf. schauen Sie in ihrem Android Handy mit DAVx5 nach, dort steht der genaue Link.
- Jetzt haben Sie schon so weit gelesen, aber: Ggf. müssen Sie ihre Kontakte gar nicht in die Starface importieren. Sie könnten z.B. ein LDAP Directory betreiben, in welchem sie ihre Kontakte ablegen
- Sie können das Programm auch unter Windows laufen lassen, benötigen dann jedoch eine Python Laufzeitumgebung.
- Ich biete das Skript für "Techies" hier als Ideengeber kostenfrei und ohne Gewährleistung oder Haftung zum Download. Sofern Sie an Support interessiert sind, können Sie ein Support Paket erwerben. Dieses beinhaltet Beratung, Einrichtung und übernahme einer Gewährleistung. Der Kostenpunkt liegt bei 400 € netto - ohne Folgekosten.
Um das Skript auf einer Starface Telefonanlage zu installieren wird zuerst eine SSH Verbindung auf die Starface hergestellt und dann die folgenden Pakete installiert:
dnf install python3-pip
pip3 install --upgrade pip
pip3 install vobject requests lxml
touch starface_addr.py
chmod +x starface_addr.py
# Nun das starface_addr.py Skript einfügen
#STRG+INS zum einfügen der Zwischenablage, STRG + C im Anschluss zum beenden. Letzte Zeile muss eine leere Zeile sein
cat > starface_addr.py
# Nur für SQL: Neuere Versionen machen Probleme, daher die alte Version installieren
pip install pymssql==2.1.4
# Nur für VCard
pip install vobject
Der Aufruf des Skriptes erfolgt mit Angabe von Input- und Output-Methode. Soll von SQL nach Starface übertragen werden, entsprechend:
./starface_addr.py -i sql -o starfaceUm Daten von Carddav zu holen und testweise auf der Kommandozeile auszugeben wäre der Aufruf:
./starface_addr.py -i carddav -o printRegelmäßiges Ausführen mit Cron via 'crontab -e'. Folgende Zeile einfügen für die tägliche Synchronisation um 07:00 Uhr morgens von Carddav nach Starface einfügen. An eigene Bedürfnisse dann individuell anpassen:
0 7 * * * PATH="$PATH:/usr/sbin:/usr/local/bin/" /root/starface_addr.py -i carddav -o starface > /dev/null
starface_add.cfg : Beispielconfig, liegt im gleichen Ordner wie das Skript
# Konfigurationsdatei fuer starface_addr.py
# Diese Datei muss neben dem Skript liegen.
# WICHTIG: Enthaelt Passwoerter -> chmod 600 starface_addr.cfg
[sql]
sqlServer = 192.168.178.250:49761
sqlDB = repdoc
sqlUser = sa
sqlPass = XXX
sqlTableName = Kunden
[carddav]
cdavUrl = https://nextcloud.krause-computer.de/remote.php/dav/addressbooks/users/Christian/contacts
cdavUser = nextclouduser
cdavPass = geheim123
cdavAuth = basic
cdavVerify = True
[starface]
sfProto = https
sfServer = 192.168.178.15
sfUser = 0001
sfPass = admpassword
[phone]
country = +49
city = 2133
starface_addr.py
#!/usr/bin/env python3
# Version 2.2 (Optional SQL/CardDAV configuration)
# Christian Krause
import io, csv, requests, json, sys, hashlib, re, argparse, configparser, urllib3
import os
import lxml.etree as ET
from urllib.parse import urlparse
# ==========================================
# Konfiguration laden
# ==========================================
# Die Konfiguration liegt in der Datei 'starface_addr.cfg' direkt neben
# diesem Skript. Werte werden 1:1 in die Original-Variablennamen geladen,
# sodass die übrige Programmlogik unverändert bleibt.
#
# Nur die tatsächlich genutzte Quelle muss konfiguriert sein:
# - Wer nur CardDAV nutzt, kann die Sektion [sql] weglassen oder leer lassen.
# - Wer nur SQL nutzt, kann die Sektion [carddav] weglassen oder leer lassen.
# Geprüft wird erst beim Aufruf mit '-i sql' bzw. '-i carddav'.
def _load_config():
cfg_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'starface_addr.cfg')
if not os.path.isfile(cfg_path):
sys.stderr.write(
f"FEHLER: Konfigurationsdatei nicht gefunden: {cfg_path}\n"
f"Bitte 'starface_addr.cfg' neben dem Skript anlegen.\n"
f"Empfehlung: chmod 600 starface_addr.cfg (enthält Passwoerter)\n"
)
sys.exit(1)
cp = configparser.RawConfigParser()
cp.read(cfg_path, encoding='utf-8')
return cp
_cfg = _load_config()
def _cfg_get(section, key, default=''):
"""Liest einen Konfigwert. Fehlende Sektionen oder Schluessel ergeben 'default'."""
if _cfg.has_option(section, key):
return _cfg.get(section, key)
return default
def _cfg_getbool(section, key, default=False):
"""Wie _cfg_get, aber Boolean. Akzeptiert true/false/yes/no/on/off (case-insensitive)."""
if not _cfg.has_option(section, key):
return default
val = _cfg.get(section, key).strip().lower()
return val in ('1', 'true', 'yes', 'on')
def _require(section, keys, mode_name):
"""Prueft, dass alle benoetigten Schluessel gesetzt und nicht leer sind.
Bricht sonst mit klarer Fehlermeldung ab."""
missing = [k for k in keys if not _cfg_get(section, k).strip()]
if missing:
sys.stderr.write(
f"FEHLER: Fuer Modus '{mode_name}' fehlen Werte in der Konfiguration:\n"
f" Sektion [{section}]: {', '.join(missing)}\n"
f"Bitte starface_addr.cfg ergaenzen.\n"
)
sys.exit(1)
# SQL Server (nur lesen, nicht erzwingen - Pruefung beim Modus-Aufruf)
sqlServer = _cfg_get('sql', 'sqlServer')
sqlDB = _cfg_get('sql', 'sqlDB')
sqlUser = _cfg_get('sql', 'sqlUser')
sqlPass = _cfg_get('sql', 'sqlPass')
sqlTableName = _cfg_get('sql', 'sqlTableName')
# Carddav (Nextcloud, Owncloud) - nur lesen, nicht erzwingen
cdavUrl = _cfg_get('carddav', 'cdavUrl')
cdavUser = _cfg_get('carddav', 'cdavUser')
cdavPass = _cfg_get('carddav', 'cdavPass')
cdavAuth = _cfg_get('carddav', 'cdavAuth', default='basic')
cdavVerify = _cfg_getbool('carddav', 'cdavVerify', default=True)
# Starface (nur lesen, nicht erzwingen - Pruefung beim Modus-Aufruf)
sfProto = _cfg_get('starface', 'sfProto')
sfServer = _cfg_get('starface', 'sfServer')
sfUser = _cfg_get('starface', 'sfUser')
sfPass = _cfg_get('starface', 'sfPass')
# Default Vorwahl (nur lesen, nicht erzwingen - Pruefung beim Modus-Aufruf)
country = _cfg_get('phone', 'country')
city = _cfg_get('phone', 'city')
# Spaltenzuweisung
sqlTables = {
"Vorname": ["Vorname [contact:firstname]" , False, '0', "Name2"],
"Nachname": ["Name [contact:familyname]" , False, '1', "Name"],
"Firma": ["Firma [contact:company]" , False, '2', ""],
"Telefon": ["Rufnummer [telephone:phone]" , True , '3', "Telefon"],
"Telefon2": ["Privat [telephone:homephone]" , True , '4', "ZusatzTel"],
"Mobil": ["Mobil [telephone:mobile]" , True , '5', "Mobil"],
"Fax": ["Fax [telephone:fax]" , True , '6', "Telefax"],
"eMail": ["E-Mail [email:e-mail]" , False, '7', ""],
"PLZ": ["PLZ [address:postcode]" , False, '8', ""],
"Stadt": ["Stadt [address:city]" , False, '9', ""],
"Strasse": ["Straße [address:street]" , False, '10', ""],
}
# Regex Vorkompilierung
RE_00 = re.compile(r'^00')
RE_0 = re.compile(r'^0([1-9])')
RE_NON_ZERO = re.compile(r'^([1-9])')
RE_SPACES = re.compile(r' +')
# ==========================================
# Integrierter CardDAV Reader (Bulk-Optimiert)
# ==========================================
class CardDAVReader:
def __init__(self, resource, user='', passwd='', verify=True, auth='basic'):
if not verify:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
split_url = urlparse(resource)
self.url_resource = resource
self.url_base = f"{split_url.scheme}://{split_url.netloc}"
self.session = requests.session()
self._settings = {'verify': verify}
if auth == 'basic':
self._settings['auth'] = (user, passwd)
elif auth == 'digest':
from requests.auth import HTTPDigestAuth
self._settings['auth'] = HTTPDigestAuth(user, passwd)
self.headers = {"User-Agent": "starface-sync/2.0"}
# Validierung des Endpunkts
resp = self.session.request('PROPFIND', self.url_resource, headers=self.headers, **self._settings)
resp.raise_for_status()
def get_abook(self):
headers = dict(self.headers)
headers['Depth'] = '1'
resp = self.session.request('PROPFIND', self.url_resource, headers=headers, **self._settings)
resp.raise_for_status()
namespace = "{DAV:}"
try:
element = ET.fromstring(resp.content)
except ET.XMLSyntaxError:
return {}
abook = {}
for response in element.findall(f"{namespace}response"):
href_elem = response.find(f"{namespace}href")
if href_elem is None or not href_elem.text: continue
href = href_elem.text
propstat = response.find(f"{namespace}propstat")
if propstat is not None:
prop = propstat.find(f"{namespace}prop")
if prop is not None:
getcontenttype = prop.find(f"{namespace}getcontenttype")
if getcontenttype is not None and getcontenttype.text and "vcard" in getcontenttype.text.lower():
getetag = prop.find(f"{namespace}getetag")
abook[href] = getetag.text if (getetag is not None and getetag.text) else ""
return abook
def get_multiple_vcards(self, hrefs, chunk_size=100):
if not hrefs: return {}
vcards = {}
for i in range(0, len(hrefs), chunk_size):
chunk = hrefs[i:i + chunk_size]
# XML für Nextcloud (addressbook-multiget mit <allprop/>)
nsmap = {'d': 'DAV:', 'c': 'urn:ietf:params:xml:ns:carddav'}
root = ET.Element(f"{{urn:ietf:params:xml:ns:carddav}}addressbook-multiget", nsmap=nsmap)
prop = ET.SubElement(root, f"{{DAV:}}prop")
ET.SubElement(prop, f"{{DAV:}}getetag")
addr_data = ET.SubElement(prop, f"{{urn:ietf:params:xml:ns:carddav}}address-data")
ET.SubElement(addr_data, f"{{urn:ietf:params:xml:ns:carddav}}allprop")
for href in chunk:
href_el = ET.SubElement(root, f"{{DAV:}}href")
href_el.text = href
payload = ET.tostring(root, encoding='utf-8', xml_declaration=True)
headers = dict(self.headers)
headers['Depth'] = '1'
headers['Content-Type'] = 'application/xml; charset=utf-8'
resp = self.session.request('REPORT', self.url_resource, data=payload, headers=headers, **self._settings)
resp.raise_for_status()
element = ET.fromstring(resp.content)
for response in element.findall(f"{{DAV:}}response"):
href_elem = response.find(f"{{DAV:}}href")
if href_elem is None or not href_elem.text: continue
propstat = response.find(f"{{DAV:}}propstat")
if propstat is not None:
prop = propstat.find(f"{{DAV:}}prop")
if prop is not None:
addr_data_elem = prop.find(f"{{urn:ietf:params:xml:ns:carddav}}address-data")
if addr_data_elem is not None and addr_data_elem.text:
vcards[href_elem.text] = addr_data_elem.text.encode('utf-8')
return vcards
# ==========================================
# Main Logic
# ==========================================
def parseArgs():
c = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, add_help=False)
c.add_argument("-C", "--config", help="Specify config file", metavar="config")
defaults = { }
args, remaining_argv = c.parse_known_args()
if args.config:
config = configparser.ConfigParser()
config.read([args.config])
defaults.update(dict(config.items("defaults")))
p = argparse.ArgumentParser(parents=[c], description='SQL/CardDAV to Starface Sync Tool')
p.set_defaults(**defaults)
p.add_argument('-v', '--version', action='version', version='%(prog)s 2.0')
p.add_argument('-i', '--input', choices=['sql', 'carddav'], required=True, help='Input Source: sql or carddav')
p.add_argument('-o', '--output', choices=['print', 'csv', 'starface'], required=True, help='Output Source')
return p.parse_args(remaining_argv)
def clean_number(number):
number = ''.join(c for c in number if c.isdigit() or c == '+')
if len(number) < 4: return ''
number = RE_00.sub('+', number)
number = RE_0.sub(country + r'\1', number)
return RE_NON_ZERO.sub(country + city + r'\1', number)
def read_sql(sqlServer, sqlUser, sqlPass, sqlDB):
csvHeader = []
queryIsNumberList = []
queryTableList = []
for tableColoum in sqlTables:
csvHeader.append(sqlTables.get(tableColoum)[0])
if sqlTables.get(tableColoum)[3]:
queryTableList.append(sqlTables.get(tableColoum)[3])
else:
queryTableList.append(f"'' AS {tableColoum}")
queryIsNumberList.append(sqlTables.get(tableColoum)[1])
queryString = ','.join(queryTableList)
def sql_generator():
import pymssql
with pymssql.connect(sqlServer, sqlUser, sqlPass, sqlDB) as conn:
with conn.cursor() as cursor:
cursor.execute('SELECT %s FROM %s'% (queryString, sqlTableName))
row = cursor.fetchone()
while row:
row = list(row)
contactWithNumber = False
# ALLE Spalten durchgehen
for index in range(len(row)):
# 1. NULL Werte aus SQL sicher in leere Strings umwandeln
if row[index] is None:
row[index] = ''
else:
# 2. Strings von versteckten Leerzeichen befreien (z.B. bei CHAR(50) Feldern)
row[index] = str(row[index]).strip()
# 3. Mehrfache Leerzeichen im String auf eines reduzieren
row[index] = RE_SPACES.sub(' ', row[index])
# 4. Wenn es eine Telefon-Spalte ist, die Nummer bereinigen
if queryIsNumberList[index] and row[index]:
row[index] = clean_number(row[index])
if row[index]: # Wenn nach der Bereinigung noch eine Nummer übrig ist
contactWithNumber = True
if contactWithNumber:
# WICHTIGER FIX: Falls das SQL-Feld "Nachname" leer ist, aber eine Firma existiert
# setzen wir die Firma als Nachnamen, damit der Key (row[1]) nicht leer ist.
if not row[1] and row[2]:
row[1] = row[2]
# Nur yielding, wenn ein Suchbegriff (Nachname/Firma) existiert
if row[1]:
yield row
row = cursor.fetchone()
return sql_generator(), csvHeader
def get_carddav(cdavUrl, user=cdavUser, passwd=cdavPass, auth=cdavAuth, verify=cdavVerify):
import vobject
csvHeader = [sqlTables.get(col)[0] for col in sqlTables]
def carddav_generator():
cdav = CardDAVReader(cdavUrl, user=user, passwd=passwd, verify=verify, auth=auth)
abook = cdav.get_abook()
hrefs = list(abook.keys())
nCards = len(hrefs)
print(f'Fetching {nCards} cards in batches...')
vcard_batch = cdav.get_multiple_vcards(hrefs)
for href, vCards_bytes in vcard_batch.items():
vCards = vCards_bytes.decode("utf-8")
for vCard in vobject.readComponents(vCards):
row = [''] * 11
if 'n' in vCard.contents:
rawName = vCard.contents['n'][0].value
elif 'org' in vCard.contents:
rawName = vCard.contents['org'][0].value
elif 'fn' in vCard.contents:
rawName = vCard.contents['fn'][0].value
else:
continue
name = RE_SPACES.sub(' ', str(rawName).strip())
try:
row[0], row[1] = name.rsplit(" ", 1)
except:
row[1] = name
try:
numbers = [tel.value for tel in vCard.contents['tel']]
except:
continue
contactWithNumber = False
i = 3
for tel in numbers:
if i <= 6:
row[i] = clean_number(tel)
if row[i]: contactWithNumber = True
i += 1
if contactWithNumber:
yield row
return carddav_generator(), csvHeader
def csv_writer(addrIterable, csvHeader):
csvObject = io.StringIO()
writer = csv.writer(csvObject, delimiter=';')
writer.writerow(csvHeader)
for addr in addrIterable:
writer.writerow(addr)
return csvObject
class Starface:
def __init__(self, sfProto, sfServer, sfUser, sfPass):
self.url = f'{sfProto}://{sfServer}'
self.User = sfUser
self.Pass = sfPass
self.headers = {'Content-Type':'application/json', 'X-Version':'2'}
self.session = requests.session()
response = self.session.get(f'{self.url}/rest/login', headers=self.headers, verify=False)
templateJson = json.loads(response.content)
userandnonce=(self.User+templateJson['nonce']).encode(encoding='utf_8', errors='strict')
hpassword=hashlib.sha512(self.Pass.encode(encoding='utf_8', errors='strict')).hexdigest()
passwordHashed=hpassword.encode(encoding='utf_8')
hsecret = hashlib.sha512(userandnonce+passwordHashed).hexdigest().encode(encoding='utf_8')
secretCompound=self.User+':'+hsecret.decode(encoding='utf_8')
templateJson['secret'] = secretCompound
authTokenResponse = self.session.post(f'{self.url}/rest/login', data=json.dumps(templateJson), headers=self.headers)
self.headers.update({'authToken':json.loads(authTokenResponse.content)['token']})
response = self.session.get(f'{self.url}/rest/contacts/tags', data='', headers=self.headers)
self.addrBook = json.loads(response.content)
def add_or_update_contact(self, row, contact_id=None):
contact={
"blocks": [
{ 'name': 'contact',
'attributes': [
{ 'name': 'firstname', 'value': row[0] },
{ 'name': 'familyname','value': row[1] },
{ 'name': 'company', 'value': row[2] }
] },
{ "name": "address",
"attributes": [
{ 'name': 'street', 'value': row[10] },
{ 'name': 'postcode', 'value': row[8] },
{ 'name': 'city', 'value': row[9] }
] },
{ "name": "telephone",
"attributes": [
{ 'name': 'phone', 'value': row[3] },
{ 'name': 'homephone','value': row[4] },
{ 'name': 'mobile', 'value': row[5] },
{ 'name': 'fax', 'value': row[6] }
] },
{ "name": "email",
"attributes": [
{ 'name': 'e-mail', 'value': row[7] }
] }
],
"editable": "true",
"tags": [
{ 'id': self.addrBook[0]['id'],
'name': self.addrBook[0]['name'],
'alias': self.addrBook[0]['alias'] }
],
"id": contact_id if contact_id else ""
}
if contact_id:
response = self.session.put(f'{self.url}/rest/contacts/{contact_id}', data=json.dumps(contact), headers=self.headers)
print(f'Status: {response.status_code} updating: {row[1]}')
else:
response = self.session.post(f'{self.url}/rest/contacts', data=json.dumps(contact), headers=self.headers)
print(f'Status: {response.status_code} adding: {row[1]}')
def _fetch_all_contacts(self, pagesize=500):
"""
Lädt alle Starface-Kontakte einmalig per Pagination.
Laut Swagger akzeptiert /rest/contacts die Parameter 'page' und 'pagesize'.
Default-pagesize ist 20 - daher MUSS pagesize explizit gesetzt werden,
sonst kommen pro Request nur 20 Treffer zurück.
Die Antwort enthält metadata.totalPages, womit die Schleife deterministisch ist.
"""
all_contacts = []
page = 1
total_pages = 1 # wird nach erstem Request aktualisiert
while page <= total_pages:
url = f'{self.url}/rest/contacts?page={page}&pagesize={pagesize}'
response = self.session.get(url, headers=self.headers)
response.raise_for_status()
data = json.loads(response.content)
contacts = data.get('contacts', []) if isinstance(data, dict) else data
all_contacts.extend(contacts)
# totalPages aus Metadata ermitteln. Falls nicht vorhanden,
# so lange weitermachen, wie volle Seiten zurückkommen.
metadata = data.get('metadata', {}) if isinstance(data, dict) else {}
if metadata and 'totalPages' in metadata:
total_pages = metadata['totalPages']
else:
if len(contacts) < pagesize:
break
total_pages = page + 1 # noch eine Seite versuchen
page += 1
return all_contacts
def _build_cache(self, contacts):
"""
Baut eine flache Liste aller Starface-Kontakte mit bereinigten Telefonnummern.
KEIN Index - das Matching erfolgt linear über die volle Liste mit der
bewährten 'in summary'-Logik aus der ursprünglichen searchTerms-Version.
"""
cache = []
for c in contacts:
summary_list = list(c.get('summaryValues') or [])
phone_list = list(c.get('phoneNumberValues') or [])
# Defensiv: Falls die Bulk-Antwort keine Phones/Summary liefert (entgegen Spec),
# versuchen wir, sie aus blocks zu rekonstruieren.
if not summary_list or not phone_list:
for block in c.get('blocks', []) or []:
if block.get('name') == 'contact':
for attr in block.get('attributes', []) or []:
if attr.get('value'):
summary_list.append(attr['value'])
elif block.get('name') == 'telephone':
for attr in block.get('attributes', []) or []:
if attr.get('value'):
phone_list.append(attr['value'])
cleaned_phones = set()
for p in phone_list:
cp = clean_number(p)
if cp:
cleaned_phones.add(cp)
cache.append({
'id': c.get('id'),
'summary': summary_list,
'phones': cleaned_phones,
})
return cache
def _match_in_cache(self, row, cache):
"""
Sucht einen passenden Eintrag im Cache. Match-Logik wie in Sonnets
searchTerms-Version: Nachname (row[1]) muss in summary stehen, Vorname
(row[0]) wird nur geprüft, wenn in der Quelle vorhanden.
Gibt das Cache-Dict zurück oder None.
"""
for c in cache:
if row[1] in c['summary']:
if row[0] and row[0] not in c['summary']:
continue
return c
return None
def _searchterms_lookup(self, row):
"""
Fallback: Befragt Starface direkt per searchTerms, falls ein Kontakt
im Bulk-Listing nicht gefunden wurde. Gibt Liste roher Kontakt-Dicts zurück
(im selben Format wie _fetch_all_contacts), oder None bei Fehler.
"""
try:
url = f'{self.url}/rest/contacts?searchTerms={row[1]}'
response = self.session.get(url, headers=self.headers)
if response.status_code != 200:
return None
data = json.loads(response.content)
if isinstance(data, dict):
return data.get('contacts', [])
return data
except Exception:
return None
def _initial_import(self, source_rows):
"""
Initial-Import-Modus: Adressbuch ist (fast) leer, daher überspringen
wir das Matching und legen alles direkt an. Quell-Duplikate werden
weiterhin über processed_inputs gefiltert.
"""
print("Initial-Import-Modus: alle Kontakte werden direkt angelegt.")
processed_inputs = set()
added = 0
for row in source_rows:
try:
input_key = (row[0], row[1])
if input_key in processed_inputs:
continue
processed_inputs.add(input_key)
self.add_or_update_contact(row)
added += 1
except Exception as e:
print(f"Fehler bei {row[1]}: {e}")
print(f"Initial-Import abgeschlossen: {added} Kontakte angelegt.")
def transfer(self, addrIterable):
print("Starte Abgleich mit Starface...")
# 1) Einmalig alle Kontakte aus Starface holen
try:
all_contacts = self._fetch_all_contacts(pagesize=500)
sf_count = len(all_contacts)
print(f"{sf_count} Kontakte aus Starface geladen.")
except Exception as e:
print(f"FEHLER beim Bulk-Laden der Starface-Kontakte: {e}")
return
# 2) Quelle materialisieren - wir brauchen die Größe für die
# Initial-Import-Entscheidung. Bei den Größenordnungen (max ~5000
# Kontakte) ist der Speicherbedarf vernachlässigbar.
source_rows = list(addrIterable)
source_count = len(source_rows)
print(f"{source_count} Kontakte aus der Quelle gelesen.")
# 3) Initial-Import-Erkennung: Wenn Starface (fast) leer ist,
# brauchen wir keinen Abgleich, sondern legen alles an.
# Schwellenwerte: < 10 absolut ODER < 10% der Quellgröße
threshold_absolute = 10
threshold_relative = max(1, int(source_count * 0.1))
if sf_count < threshold_absolute or sf_count < threshold_relative:
print(f"Starface hat nur {sf_count} Kontakte "
f"(< {max(threshold_absolute, threshold_relative)}) - "
f"wechsle in Initial-Import-Modus.")
self._initial_import(source_rows)
return
# 4) Normaler Sync: Cache aufbauen
sf_cache = self._build_cache(all_contacts)
# 5) Lokal abgleichen, mit Fallback auf searchTerms
processed_inputs = set()
for row in source_rows:
try:
input_key = (row[0], row[1])
if input_key in processed_inputs:
continue
processed_inputs.add(input_key)
new_phones = {row[3], row[4], row[5], row[6]} - {''}
# 5a) Erst im Bulk-Cache suchen
matched = self._match_in_cache(row, sf_cache)
# 5b) Falls nicht gefunden: searchTerms-Fallback
if matched is None:
fallback_contacts = self._searchterms_lookup(row)
if fallback_contacts:
# Fallback-Treffer in Cache-Format konvertieren und
# in den Cache aufnehmen, damit Folgezeilen ihn finden
fallback_cache = self._build_cache(fallback_contacts)
matched = self._match_in_cache(row, fallback_cache)
if matched is not None:
sf_cache.append(matched)
# 5c) Match -> Update wenn nötig, sonst Nothing changed
if matched is not None:
missing_phones = new_phones - matched['phones']
if missing_phones:
self.add_or_update_contact(row, matched['id'])
matched['phones'].update(new_phones)
else:
print(f'Nothing changed: {row[1]}')
else:
# 5d) Wirklich nicht da -> neu anlegen
self.add_or_update_contact(row)
sf_cache.append({
'id': 'new',
'summary': [row[0], row[1]],
'phones': set(new_phones)
})
except Exception as e:
print(f"Fehler bei {row[1]}: {e}")
# ==========================================
# Skript Ausführung
# ==========================================
if __name__ == "__main__":
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
urllib3.disable_warnings(category=urllib3.exceptions.InsecureRequestWarning)
a = parseArgs()
# Erst alle benoetigten Sektionen pruefen, bevor teure Operationen starten
if a.input == 'sql':
_require('sql', ['sqlServer', 'sqlDB', 'sqlUser', 'sqlPass', 'sqlTableName'], 'sql')
_require('phone', ['country', 'city'], 'phone')
elif a.input == 'carddav':
_require('carddav', ['cdavUrl', 'cdavUser', 'cdavPass'], 'carddav')
if a.output == 'starface':
_require('starface', ['sfProto', 'sfServer', 'sfUser', 'sfPass'], 'starface')
_require('phone', ['country', 'city'], 'phone')
# Jetzt die eigentliche Verarbeitung
if a.input == 'sql':
addrIterable, csvHeader = read_sql(sqlServer, sqlUser, sqlPass, sqlDB)
elif a.input == 'carddav':
addrIterable, csvHeader = get_carddav(cdavUrl, user=cdavUser, passwd=cdavPass, auth=cdavAuth, verify=cdavVerify)
if a.output == 'print':
csvObject = csv_writer(addrIterable, csvHeader)
print(csvObject.getvalue())
elif a.output == 'csv':
csvObject = csv_writer(addrIterable, csvHeader)
with open('sql_output.csv', 'w', encoding="utf-8") as file:
file.write(csvObject.getvalue())
print('sql_output.csv written')
elif a.output == 'starface':
S = Starface(sfProto, sfServer, sfUser, sfPass)
S.transfer(addrIterable)
Das Skript kann auch auf anderen Linux-Umgebungen (z.B. einem Proxmox Hypervisor) installiert werden. Es sind inkompatibilitäten mit Python 3.13 (Proxmox 9) bekannt: Die Rest Schnittstelle funktioniert nur noch, sofern die Starface über ein nicht selbst-signiertes Zertifikat verfügt. Wer eine schnelle Lösung hat, gerne per E-Mail zu mir.