Starface Adressbuch: SQL / CardDav Import via REST API

Zur Synchronisation eines Starface Adressbuches mit einem CRM System ist ein Zugriff auf die SQL Datenbank des CRM erforderlich.

Hierzu gibt es kostenpflichtige Module, die einige Nachteile mit sich bringen:

  • Sie werden i.d.R. mit jedem großen Starface Update zerschossen und erfordern manuelle Nacharbeiten bei jedem größeren Starface Update
  • Der Preis: hohe, monatliche Beträge, die mit der Anzahl der Kontakte skalieren sind genauso vertreten wie Module mit dreistelligem Einmalbetrag zzgl. jährlicher Wartungsgebühren.
  • Sie funktionieren nicht mit einer kostenfreien Starface Free Lizenz

Das folgende Python Skript übernimmt die Synchronisation. Es läuft direkt auf dem Linux der Starface und benötigt Netzwerkzugriff auf die SQL Datenbank. Das funktioniert i.d.R. sehr lange Updatestabil.

Hinweise:

  • Die Synchronisation funktioniert ebenfalls mit Carddav-Quellen wie Nextcloud/Owncload oder einem Synology NAS.
  • Der Carddav Adressbuch-Link muss der vollständige Link zum Adressbuch sein, nicht nur der zur Haupt-URL der Nextcloud Instanz wie im Quellcode ggf. suggeriert wird. Ggf. schauen Sie in ihrem Android Handy mit DAVx5 nach, dort steht der genaue Link.
  • Jetzt haben Sie schon so weit gelesen, aber: Ggf. müssen Sie ihre Kontakte gar nicht in die Starface importieren. Sie könnten z.B. ein LDAP Directory betreiben, in welchem sie ihre Kontakte ablegen
  • Sie können das Programm auch unter Windows laufen lassen, benötigen dann jedoch eine Python Laufzeitumgebung.
  • Ich biete das Skript für "Techies" hier als Ideengeber kostenfrei und ohne Gewährleistung oder Haftung zum Download. Sofern Sie an Support interessiert sind, können Sie ein Support Paket erwerben. Dieses beinhaltet Beratung, Einrichtung und übernahme einer Gewährleistung. Der Kostenpunkt liegt bei 400 € netto - ohne Folgekosten.

 

Um das Skript auf einer Starface Telefonanlage zu installieren wird zuerst eine SSH Verbindung auf die Starface hergestellt und dann die folgenden Pakete installiert:

dnf install python3-pip
pip3 install --upgrade pip
pip3 install vobject requests lxml
touch starface_addr.py
chmod +x starface_addr.py

# Nun das starface_addr.py Skript einfügen
#STRG+INS zum einfügen der Zwischenablage, STRG + C im Anschluss zum beenden. Letzte Zeile muss eine leere Zeile sein
cat > starface_addr.py


 # Nur für SQL: Neuere Versionen machen Probleme, daher die alte Version installieren
pip install pymssql==2.1.4


# Nur für VCard
pip install vobject 


 

Der Aufruf des Skriptes erfolgt mit Angabe von Input- und Output-Methode. Soll von SQL nach Starface übertragen werden, entsprechend:

 ./starface_addr.py -i sql -o starface

Um Daten von Carddav zu holen und testweise auf der Kommandozeile auszugeben wäre der Aufruf:

 ./starface_addr.py -i carddav -o print

Regelmäßiges Ausführen mit Cron via 'crontab -e'. Folgende Zeile einfügen für die tägliche Synchronisation um 07:00 Uhr morgens von Carddav nach Starface einfügen. An eigene Bedürfnisse dann individuell anpassen:

0 7 * * * PATH="$PATH:/usr/sbin:/usr/local/bin/" /root/starface_addr.py -i carddav -o starface > /dev/null

starface_add.cfg : Beispielconfig, liegt im gleichen Ordner wie das Skript

# Konfigurationsdatei fuer starface_addr.py
# Diese Datei muss neben dem Skript liegen.
# WICHTIG: Enthaelt Passwoerter -> chmod 600 starface_addr.cfg

[sql]
sqlServer = 192.168.178.250:49761
sqlDB = repdoc
sqlUser = sa
sqlPass = XXX
sqlTableName = Kunden

[carddav]
cdavUrl = https://nextcloud.krause-computer.de/remote.php/dav/addressbooks/users/Christian/contacts
cdavUser = nextclouduser
cdavPass = geheim123
cdavAuth = basic
cdavVerify = True

[starface]
sfProto = https
sfServer = 192.168.178.15
sfUser = 0001
sfPass = admpassword

[phone]
country = +49
city = 2133

starface_addr.py

#!/usr/bin/env python3
# Version 2.2 (Optional SQL/CardDAV configuration)
# Christian Krause

import io, csv, requests, json, sys, hashlib, re, argparse, configparser, urllib3
import os
import lxml.etree as ET
from urllib.parse import urlparse

# ==========================================
# Konfiguration laden
# ==========================================
# Die Konfiguration liegt in der Datei 'starface_addr.cfg' direkt neben
# diesem Skript. Werte werden 1:1 in die Original-Variablennamen geladen,
# sodass die übrige Programmlogik unverändert bleibt.
#
# Nur die tatsächlich genutzte Quelle muss konfiguriert sein:
# - Wer nur CardDAV nutzt, kann die Sektion [sql] weglassen oder leer lassen.
# - Wer nur SQL nutzt, kann die Sektion [carddav] weglassen oder leer lassen.
# Geprüft wird erst beim Aufruf mit '-i sql' bzw. '-i carddav'.

def _load_config():
    cfg_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'starface_addr.cfg')
    if not os.path.isfile(cfg_path):
        sys.stderr.write(
            f"FEHLER: Konfigurationsdatei nicht gefunden: {cfg_path}\n"
            f"Bitte 'starface_addr.cfg' neben dem Skript anlegen.\n"
            f"Empfehlung: chmod 600 starface_addr.cfg (enthält Passwoerter)\n"
        )
        sys.exit(1)
    cp = configparser.RawConfigParser()
    cp.read(cfg_path, encoding='utf-8')
    return cp

_cfg = _load_config()

def _cfg_get(section, key, default=''):
    """Liest einen Konfigwert. Fehlende Sektionen oder Schluessel ergeben 'default'."""
    if _cfg.has_option(section, key):
        return _cfg.get(section, key)
    return default

def _cfg_getbool(section, key, default=False):
    """Wie _cfg_get, aber Boolean. Akzeptiert true/false/yes/no/on/off (case-insensitive)."""
    if not _cfg.has_option(section, key):
        return default
    val = _cfg.get(section, key).strip().lower()
    return val in ('1', 'true', 'yes', 'on')

def _require(section, keys, mode_name):
    """Prueft, dass alle benoetigten Schluessel gesetzt und nicht leer sind.
    Bricht sonst mit klarer Fehlermeldung ab."""
    missing = [k for k in keys if not _cfg_get(section, k).strip()]
    if missing:
        sys.stderr.write(
            f"FEHLER: Fuer Modus '{mode_name}' fehlen Werte in der Konfiguration:\n"
            f"  Sektion [{section}]: {', '.join(missing)}\n"
            f"Bitte starface_addr.cfg ergaenzen.\n"
        )
        sys.exit(1)

# SQL Server (nur lesen, nicht erzwingen - Pruefung beim Modus-Aufruf)
sqlServer    = _cfg_get('sql', 'sqlServer')
sqlDB        = _cfg_get('sql', 'sqlDB')
sqlUser      = _cfg_get('sql', 'sqlUser')
sqlPass      = _cfg_get('sql', 'sqlPass')
sqlTableName = _cfg_get('sql', 'sqlTableName')

# Carddav (Nextcloud, Owncloud) - nur lesen, nicht erzwingen
cdavUrl    = _cfg_get('carddav', 'cdavUrl')
cdavUser   = _cfg_get('carddav', 'cdavUser')
cdavPass   = _cfg_get('carddav', 'cdavPass')
cdavAuth   = _cfg_get('carddav', 'cdavAuth', default='basic')
cdavVerify = _cfg_getbool('carddav', 'cdavVerify', default=True)

# Starface (nur lesen, nicht erzwingen - Pruefung beim Modus-Aufruf)
sfProto  = _cfg_get('starface', 'sfProto')
sfServer = _cfg_get('starface', 'sfServer')
sfUser   = _cfg_get('starface', 'sfUser')
sfPass   = _cfg_get('starface', 'sfPass')

# Default Vorwahl (nur lesen, nicht erzwingen - Pruefung beim Modus-Aufruf)
country = _cfg_get('phone', 'country')
city    = _cfg_get('phone', 'city')

# Spaltenzuweisung
sqlTables = {
    "Vorname":  ["Vorname [contact:firstname]"  , False, '0', "Name2"],
    "Nachname": ["Name [contact:familyname]"    , False, '1', "Name"],
    "Firma":    ["Firma [contact:company]"      , False, '2', ""],
    "Telefon":  ["Rufnummer [telephone:phone]"  , True , '3', "Telefon"],
    "Telefon2": ["Privat [telephone:homephone]" , True , '4', "ZusatzTel"],
    "Mobil":    ["Mobil [telephone:mobile]"     , True , '5', "Mobil"],
    "Fax":      ["Fax [telephone:fax]"          , True , '6', "Telefax"],
    "eMail":    ["E-Mail [email:e-mail]"        , False, '7', ""],
    "PLZ":      ["PLZ [address:postcode]"       , False, '8', ""],
    "Stadt":    ["Stadt [address:city]"         , False, '9', ""],
    "Strasse":  ["Straße [address:street]"      , False, '10', ""],
}

# Regex Vorkompilierung
RE_00 = re.compile(r'^00')
RE_0 = re.compile(r'^0([1-9])')
RE_NON_ZERO = re.compile(r'^([1-9])')
RE_SPACES = re.compile(r' +')

# ==========================================
# Integrierter CardDAV Reader (Bulk-Optimiert)
# ==========================================

class CardDAVReader:
    def __init__(self, resource, user='', passwd='', verify=True, auth='basic'):
        if not verify:
            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

        split_url = urlparse(resource)
        self.url_resource = resource
        self.url_base = f"{split_url.scheme}://{split_url.netloc}"

        self.session = requests.session()
        self._settings = {'verify': verify}
        
        if auth == 'basic':
            self._settings['auth'] = (user, passwd)
        elif auth == 'digest':
            from requests.auth import HTTPDigestAuth
            self._settings['auth'] = HTTPDigestAuth(user, passwd)
            
        self.headers = {"User-Agent": "starface-sync/2.0"}
        
        # Validierung des Endpunkts
        resp = self.session.request('PROPFIND', self.url_resource, headers=self.headers, **self._settings)
        resp.raise_for_status()

    def get_abook(self):
        headers = dict(self.headers)
        headers['Depth'] = '1'
        resp = self.session.request('PROPFIND', self.url_resource, headers=headers, **self._settings)
        resp.raise_for_status()
        
        namespace = "{DAV:}"
        try:
            element = ET.fromstring(resp.content)
        except ET.XMLSyntaxError:
            return {}

        abook = {}
        for response in element.findall(f"{namespace}response"):
            href_elem = response.find(f"{namespace}href")
            if href_elem is None or not href_elem.text: continue
                
            href = href_elem.text
            propstat = response.find(f"{namespace}propstat")
            if propstat is not None:
                prop = propstat.find(f"{namespace}prop")
                if prop is not None:
                    getcontenttype = prop.find(f"{namespace}getcontenttype")
                    if getcontenttype is not None and getcontenttype.text and "vcard" in getcontenttype.text.lower():
                        getetag = prop.find(f"{namespace}getetag")
                        abook[href] = getetag.text if (getetag is not None and getetag.text) else ""
        return abook

    def get_multiple_vcards(self, hrefs, chunk_size=100):
        if not hrefs: return {}
        vcards = {}

        for i in range(0, len(hrefs), chunk_size):
            chunk = hrefs[i:i + chunk_size]
            
            # XML für Nextcloud (addressbook-multiget mit <allprop/>)
            nsmap = {'d': 'DAV:', 'c': 'urn:ietf:params:xml:ns:carddav'}
            root = ET.Element(f"{{urn:ietf:params:xml:ns:carddav}}addressbook-multiget", nsmap=nsmap)
            
            prop = ET.SubElement(root, f"{{DAV:}}prop")
            ET.SubElement(prop, f"{{DAV:}}getetag")
            addr_data = ET.SubElement(prop, f"{{urn:ietf:params:xml:ns:carddav}}address-data")
            ET.SubElement(addr_data, f"{{urn:ietf:params:xml:ns:carddav}}allprop")
            
            for href in chunk:
                href_el = ET.SubElement(root, f"{{DAV:}}href")
                href_el.text = href

            payload = ET.tostring(root, encoding='utf-8', xml_declaration=True)
            
            headers = dict(self.headers)
            headers['Depth'] = '1'
            headers['Content-Type'] = 'application/xml; charset=utf-8'

            resp = self.session.request('REPORT', self.url_resource, data=payload, headers=headers, **self._settings)
            resp.raise_for_status()

            element = ET.fromstring(resp.content)
            for response in element.findall(f"{{DAV:}}response"):
                href_elem = response.find(f"{{DAV:}}href")
                if href_elem is None or not href_elem.text: continue
                
                propstat = response.find(f"{{DAV:}}propstat")
                if propstat is not None:
                    prop = propstat.find(f"{{DAV:}}prop")
                    if prop is not None:
                        addr_data_elem = prop.find(f"{{urn:ietf:params:xml:ns:carddav}}address-data")
                        if addr_data_elem is not None and addr_data_elem.text:
                            vcards[href_elem.text] = addr_data_elem.text.encode('utf-8')
        return vcards

# ==========================================
# Main Logic
# ==========================================

def parseArgs():
    c = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, add_help=False)
    c.add_argument("-C", "--config", help="Specify config file", metavar="config")
    defaults = { }
    args, remaining_argv = c.parse_known_args()

    if args.config:
        config = configparser.ConfigParser()
        config.read([args.config])
        defaults.update(dict(config.items("defaults")))

    p = argparse.ArgumentParser(parents=[c], description='SQL/CardDAV to Starface Sync Tool')
    p.set_defaults(**defaults)
    p.add_argument('-v', '--version', action='version', version='%(prog)s 2.0')
    p.add_argument('-i', '--input', choices=['sql', 'carddav'], required=True, help='Input Source: sql or carddav')
    p.add_argument('-o', '--output', choices=['print', 'csv', 'starface'], required=True, help='Output Source')

    return p.parse_args(remaining_argv)

def clean_number(number):
    number = ''.join(c for c in number if c.isdigit() or c == '+')
    if len(number) < 4: return ''
    number = RE_00.sub('+', number)
    number = RE_0.sub(country + r'\1', number)
    return RE_NON_ZERO.sub(country + city + r'\1', number)


def read_sql(sqlServer, sqlUser, sqlPass, sqlDB):
    csvHeader = []
    queryIsNumberList = []
    queryTableList = []

    for tableColoum in sqlTables:
        csvHeader.append(sqlTables.get(tableColoum)[0])
        if sqlTables.get(tableColoum)[3]:
            queryTableList.append(sqlTables.get(tableColoum)[3])
        else:
            queryTableList.append(f"'' AS {tableColoum}")
        queryIsNumberList.append(sqlTables.get(tableColoum)[1])
    
    queryString = ','.join(queryTableList)
    
    def sql_generator():
        import pymssql
        with pymssql.connect(sqlServer, sqlUser, sqlPass, sqlDB) as conn:
            with conn.cursor() as cursor:
                cursor.execute('SELECT %s FROM %s'% (queryString, sqlTableName))
                row = cursor.fetchone()
                while row:
                    row = list(row)
                    contactWithNumber = False
                    
                    # ALLE Spalten durchgehen
                    for index in range(len(row)):
                        # 1. NULL Werte aus SQL sicher in leere Strings umwandeln
                        if row[index] is None:
                            row[index] = ''
                        else:
                            # 2. Strings von versteckten Leerzeichen befreien (z.B. bei CHAR(50) Feldern)
                            row[index] = str(row[index]).strip()
                            # 3. Mehrfache Leerzeichen im String auf eines reduzieren
                            row[index] = RE_SPACES.sub(' ', row[index])
                            
                        # 4. Wenn es eine Telefon-Spalte ist, die Nummer bereinigen
                        if queryIsNumberList[index] and row[index]:
                            row[index] = clean_number(row[index])
                            if row[index]: # Wenn nach der Bereinigung noch eine Nummer übrig ist
                                contactWithNumber = True
                                
                    if contactWithNumber:
                        # WICHTIGER FIX: Falls das SQL-Feld "Nachname" leer ist, aber eine Firma existiert
                        # setzen wir die Firma als Nachnamen, damit der Key (row[1]) nicht leer ist.
                        if not row[1] and row[2]:
                            row[1] = row[2]
                            
                        # Nur yielding, wenn ein Suchbegriff (Nachname/Firma) existiert
                        if row[1]: 
                            yield row
                            
                    row = cursor.fetchone()
                    
    return sql_generator(), csvHeader


def get_carddav(cdavUrl, user=cdavUser, passwd=cdavPass, auth=cdavAuth, verify=cdavVerify):
    import vobject
    csvHeader = [sqlTables.get(col)[0] for col in sqlTables]
    
    def carddav_generator():
        cdav = CardDAVReader(cdavUrl, user=user, passwd=passwd, verify=verify, auth=auth)
        abook = cdav.get_abook()
        hrefs = list(abook.keys())
        nCards = len(hrefs)
        
        print(f'Fetching {nCards} cards in batches...')
        vcard_batch = cdav.get_multiple_vcards(hrefs)
        
        for href, vCards_bytes in vcard_batch.items():
            vCards = vCards_bytes.decode("utf-8")
            for vCard in vobject.readComponents(vCards):
                row = [''] * 11
                if 'n' in vCard.contents:
                    rawName = vCard.contents['n'][0].value
                elif 'org' in vCard.contents:
                    rawName = vCard.contents['org'][0].value
                elif 'fn' in vCard.contents:
                    rawName = vCard.contents['fn'][0].value
                else:
                    continue
                    
                name = RE_SPACES.sub(' ', str(rawName).strip())
                try:
                    row[0], row[1] = name.rsplit(" ", 1)
                except:
                    row[1] = name
                    
                try:
                    numbers = [tel.value for tel in vCard.contents['tel']]
                except:
                    continue
                    
                contactWithNumber = False
                i = 3
                for tel in numbers:
                    if i <= 6:
                        row[i] = clean_number(tel)
                        if row[i]: contactWithNumber = True
                        i += 1
                
                if contactWithNumber:
                    yield row
                    
    return carddav_generator(), csvHeader


def csv_writer(addrIterable, csvHeader):
    csvObject = io.StringIO()
    writer = csv.writer(csvObject, delimiter=';')
    writer.writerow(csvHeader)
    for addr in addrIterable:
        writer.writerow(addr)
    return csvObject

class Starface:
    def __init__(self, sfProto, sfServer, sfUser, sfPass):
        self.url = f'{sfProto}://{sfServer}'
        self.User = sfUser
        self.Pass = sfPass
        self.headers = {'Content-Type':'application/json', 'X-Version':'2'}
        self.session = requests.session()

        response = self.session.get(f'{self.url}/rest/login', headers=self.headers, verify=False)
        templateJson = json.loads(response.content)
        userandnonce=(self.User+templateJson['nonce']).encode(encoding='utf_8', errors='strict')
        hpassword=hashlib.sha512(self.Pass.encode(encoding='utf_8', errors='strict')).hexdigest()
        passwordHashed=hpassword.encode(encoding='utf_8')
        hsecret = hashlib.sha512(userandnonce+passwordHashed).hexdigest().encode(encoding='utf_8')
        secretCompound=self.User+':'+hsecret.decode(encoding='utf_8')
        templateJson['secret'] = secretCompound
        authTokenResponse = self.session.post(f'{self.url}/rest/login', data=json.dumps(templateJson), headers=self.headers)
        self.headers.update({'authToken':json.loads(authTokenResponse.content)['token']})
        
        response = self.session.get(f'{self.url}/rest/contacts/tags', data='', headers=self.headers)
        self.addrBook = json.loads(response.content)

    def add_or_update_contact(self, row, contact_id=None):
        contact={
        "blocks": [
            {  'name': 'contact',
               'attributes': [
                   { 'name': 'firstname', 'value': row[0] },
                   { 'name': 'familyname','value': row[1] },
                   { 'name': 'company',   'value': row[2] }
               ] },
            {  "name": "address",
               "attributes": [
                   { 'name': 'street',   'value': row[10] },
                   { 'name': 'postcode', 'value': row[8]  },
                   { 'name': 'city',     'value': row[9]  }
               ] },
            {  "name": "telephone",
               "attributes": [
                   { 'name': 'phone',    'value': row[3]  },
                   { 'name': 'homephone','value': row[4]  },
                   { 'name': 'mobile',   'value': row[5]  },
                   { 'name': 'fax',      'value': row[6]  }
               ] },
            {  "name": "email",
               "attributes": [
                   { 'name': 'e-mail',   'value': row[7]  }
               ] }
        ],
        "editable": "true",
        "tags": [
            {   'id': self.addrBook[0]['id'],
                'name': self.addrBook[0]['name'],
                'alias': self.addrBook[0]['alias'] }
        ],
        "id": contact_id if contact_id else ""
        }
        
        if contact_id:
            response = self.session.put(f'{self.url}/rest/contacts/{contact_id}', data=json.dumps(contact), headers=self.headers)
            print(f'Status: {response.status_code} updating: {row[1]}')
        else:
            response = self.session.post(f'{self.url}/rest/contacts', data=json.dumps(contact), headers=self.headers)
            print(f'Status: {response.status_code} adding: {row[1]}')
        
    def _fetch_all_contacts(self, pagesize=500):
        """
        Lädt alle Starface-Kontakte einmalig per Pagination.
        Laut Swagger akzeptiert /rest/contacts die Parameter 'page' und 'pagesize'.
        Default-pagesize ist 20 - daher MUSS pagesize explizit gesetzt werden,
        sonst kommen pro Request nur 20 Treffer zurück.
        Die Antwort enthält metadata.totalPages, womit die Schleife deterministisch ist.
        """
        all_contacts = []
        page = 1
        total_pages = 1  # wird nach erstem Request aktualisiert

        while page <= total_pages:
            url = f'{self.url}/rest/contacts?page={page}&pagesize={pagesize}'
            response = self.session.get(url, headers=self.headers)
            response.raise_for_status()
            data = json.loads(response.content)

            contacts = data.get('contacts', []) if isinstance(data, dict) else data
            all_contacts.extend(contacts)

            # totalPages aus Metadata ermitteln. Falls nicht vorhanden,
            # so lange weitermachen, wie volle Seiten zurückkommen.
            metadata = data.get('metadata', {}) if isinstance(data, dict) else {}
            if metadata and 'totalPages' in metadata:
                total_pages = metadata['totalPages']
            else:
                if len(contacts) < pagesize:
                    break
                total_pages = page + 1  # noch eine Seite versuchen

            page += 1

        return all_contacts

    def _build_cache(self, contacts):
        """
        Baut eine flache Liste aller Starface-Kontakte mit bereinigten Telefonnummern.
        KEIN Index - das Matching erfolgt linear über die volle Liste mit der
        bewährten 'in summary'-Logik aus der ursprünglichen searchTerms-Version.
        """
        cache = []

        for c in contacts:
            summary_list = list(c.get('summaryValues') or [])
            phone_list = list(c.get('phoneNumberValues') or [])

            # Defensiv: Falls die Bulk-Antwort keine Phones/Summary liefert (entgegen Spec),
            # versuchen wir, sie aus blocks zu rekonstruieren.
            if not summary_list or not phone_list:
                for block in c.get('blocks', []) or []:
                    if block.get('name') == 'contact':
                        for attr in block.get('attributes', []) or []:
                            if attr.get('value'):
                                summary_list.append(attr['value'])
                    elif block.get('name') == 'telephone':
                        for attr in block.get('attributes', []) or []:
                            if attr.get('value'):
                                phone_list.append(attr['value'])

            cleaned_phones = set()
            for p in phone_list:
                cp = clean_number(p)
                if cp:
                    cleaned_phones.add(cp)

            cache.append({
                'id': c.get('id'),
                'summary': summary_list,
                'phones': cleaned_phones,
            })

        return cache

    def _match_in_cache(self, row, cache):
        """
        Sucht einen passenden Eintrag im Cache. Match-Logik wie in Sonnets
        searchTerms-Version: Nachname (row[1]) muss in summary stehen, Vorname
        (row[0]) wird nur geprüft, wenn in der Quelle vorhanden.
        Gibt das Cache-Dict zurück oder None.
        """
        for c in cache:
            if row[1] in c['summary']:
                if row[0] and row[0] not in c['summary']:
                    continue
                return c
        return None

    def _searchterms_lookup(self, row):
        """
        Fallback: Befragt Starface direkt per searchTerms, falls ein Kontakt
        im Bulk-Listing nicht gefunden wurde. Gibt Liste roher Kontakt-Dicts zurück
        (im selben Format wie _fetch_all_contacts), oder None bei Fehler.
        """
        try:
            url = f'{self.url}/rest/contacts?searchTerms={row[1]}'
            response = self.session.get(url, headers=self.headers)
            if response.status_code != 200:
                return None
            data = json.loads(response.content)
            if isinstance(data, dict):
                return data.get('contacts', [])
            return data
        except Exception:
            return None

    def _initial_import(self, source_rows):
        """
        Initial-Import-Modus: Adressbuch ist (fast) leer, daher überspringen
        wir das Matching und legen alles direkt an. Quell-Duplikate werden
        weiterhin über processed_inputs gefiltert.
        """
        print("Initial-Import-Modus: alle Kontakte werden direkt angelegt.")
        processed_inputs = set()
        added = 0
        for row in source_rows:
            try:
                input_key = (row[0], row[1])
                if input_key in processed_inputs:
                    continue
                processed_inputs.add(input_key)
                self.add_or_update_contact(row)
                added += 1
            except Exception as e:
                print(f"Fehler bei {row[1]}: {e}")
        print(f"Initial-Import abgeschlossen: {added} Kontakte angelegt.")

    def transfer(self, addrIterable):
        print("Starte Abgleich mit Starface...")

        # 1) Einmalig alle Kontakte aus Starface holen
        try:
            all_contacts = self._fetch_all_contacts(pagesize=500)
            sf_count = len(all_contacts)
            print(f"{sf_count} Kontakte aus Starface geladen.")
        except Exception as e:
            print(f"FEHLER beim Bulk-Laden der Starface-Kontakte: {e}")
            return

        # 2) Quelle materialisieren - wir brauchen die Größe für die
        #    Initial-Import-Entscheidung. Bei den Größenordnungen (max ~5000
        #    Kontakte) ist der Speicherbedarf vernachlässigbar.
        source_rows = list(addrIterable)
        source_count = len(source_rows)
        print(f"{source_count} Kontakte aus der Quelle gelesen.")

        # 3) Initial-Import-Erkennung: Wenn Starface (fast) leer ist,
        #    brauchen wir keinen Abgleich, sondern legen alles an.
        #    Schwellenwerte: < 10 absolut ODER < 10% der Quellgröße
        threshold_absolute = 10
        threshold_relative = max(1, int(source_count * 0.1))
        if sf_count < threshold_absolute or sf_count < threshold_relative:
            print(f"Starface hat nur {sf_count} Kontakte "
                  f"(< {max(threshold_absolute, threshold_relative)}) - "
                  f"wechsle in Initial-Import-Modus.")
            self._initial_import(source_rows)
            return

        # 4) Normaler Sync: Cache aufbauen
        sf_cache = self._build_cache(all_contacts)

        # 5) Lokal abgleichen, mit Fallback auf searchTerms
        processed_inputs = set()

        for row in source_rows:
            try:
                input_key = (row[0], row[1])
                if input_key in processed_inputs:
                    continue
                processed_inputs.add(input_key)

                new_phones = {row[3], row[4], row[5], row[6]} - {''}

                # 5a) Erst im Bulk-Cache suchen
                matched = self._match_in_cache(row, sf_cache)

                # 5b) Falls nicht gefunden: searchTerms-Fallback
                if matched is None:
                    fallback_contacts = self._searchterms_lookup(row)
                    if fallback_contacts:
                        # Fallback-Treffer in Cache-Format konvertieren und
                        # in den Cache aufnehmen, damit Folgezeilen ihn finden
                        fallback_cache = self._build_cache(fallback_contacts)
                        matched = self._match_in_cache(row, fallback_cache)
                        if matched is not None:
                            sf_cache.append(matched)

                # 5c) Match -> Update wenn nötig, sonst Nothing changed
                if matched is not None:
                    missing_phones = new_phones - matched['phones']
                    if missing_phones:
                        self.add_or_update_contact(row, matched['id'])
                        matched['phones'].update(new_phones)
                    else:
                        print(f'Nothing changed: {row[1]}')
                else:
                    # 5d) Wirklich nicht da -> neu anlegen
                    self.add_or_update_contact(row)
                    sf_cache.append({
                        'id': 'new',
                        'summary': [row[0], row[1]],
                        'phones': set(new_phones)
                    })

            except Exception as e:
                print(f"Fehler bei {row[1]}: {e}")

# ==========================================
# Skript Ausführung
# ==========================================

if __name__ == "__main__":
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    urllib3.disable_warnings(category=urllib3.exceptions.InsecureRequestWarning)

    a = parseArgs()

    # Erst alle benoetigten Sektionen pruefen, bevor teure Operationen starten
    if a.input == 'sql':
        _require('sql', ['sqlServer', 'sqlDB', 'sqlUser', 'sqlPass', 'sqlTableName'], 'sql')
        _require('phone', ['country', 'city'], 'phone')
    elif a.input == 'carddav':
        _require('carddav', ['cdavUrl', 'cdavUser', 'cdavPass'], 'carddav')

    if a.output == 'starface':
        _require('starface', ['sfProto', 'sfServer', 'sfUser', 'sfPass'], 'starface')
        _require('phone', ['country', 'city'], 'phone')

    # Jetzt die eigentliche Verarbeitung
    if a.input == 'sql':
        addrIterable, csvHeader = read_sql(sqlServer, sqlUser, sqlPass, sqlDB)
    elif a.input == 'carddav':
        addrIterable, csvHeader = get_carddav(cdavUrl, user=cdavUser, passwd=cdavPass, auth=cdavAuth, verify=cdavVerify)

    if a.output == 'print':
        csvObject = csv_writer(addrIterable, csvHeader)
        print(csvObject.getvalue())

    elif a.output == 'csv':
        csvObject = csv_writer(addrIterable, csvHeader)
        with open('sql_output.csv', 'w', encoding="utf-8") as file:
            file.write(csvObject.getvalue())
            print('sql_output.csv written')

    elif a.output == 'starface':
        S = Starface(sfProto, sfServer, sfUser, sfPass)
        S.transfer(addrIterable)

 

Das Skript kann auch auf anderen Linux-Umgebungen (z.B. einem Proxmox Hypervisor) installiert werden. Es sind inkompatibilitäten mit Python 3.13 (Proxmox 9) bekannt: Die Rest Schnittstelle funktioniert nur noch, sofern die Starface über ein nicht selbst-signiertes Zertifikat verfügt. Wer eine schnelle Lösung hat, gerne per E-Mail zu mir.