[Ricorsione] Divide & Conquer su un vettore

elgiovo
Ciao ragazzi,

stavo scrivendo un programmino Matlab (ma è un argomento trasversale che va bene per quasi tutti i linguaggi) che splitta ricorsivamente un vettore. La mia difficoltà sta nel fatto che il mio algoritmo non è una semplice ricerca binaria, che "butta via" una parte del vettore e tiene l'altra. Ho infatti una funzione piuttosto complicata che agisce su un vettore e trova al più un indice in cui si verifica una certa condizione. Per fare un esempio, la funzione può essere definita come

f(v) = indice(max(sqrt(v)))


che può trovare al più un indice (nessuno se v contiene solo numeri negativi).
Ora, se la funzione non trova nulla il codice deve fermarsi, mentre se trova un indice deve proseguire ad applicare la funzione ai due sottovettori che derivano dallo splitting del vettore originario. Lo splitting va effettuato nell'indice appena trovato. Il risultato finale dovrebbe essere un vettore con gli indici del vettore di partenza tali per cui la funzione f() ha dato un risultato. La parte in neretto mi sta creando delle difficoltà, perché ogni volta che vengono generati due sottovettori i loro indici partono sempre da 1. Avevo pensato di "incollare" sotto al vettore di partenza un vettore di indici e far poi agire la funzione sulla prima riga della matrice che ne risulta, ma non mi sembra molto elegante.

Siccome penso che sia un problema già affrontato da altri, vorre evitare di reinventare la ruota e vi chiedo umilmente aiuto.

Grazie!

Risposte
apatriarca
Il funzionamento della tuo algoritmo non mi è del tutto chiaro. Provo a fare un esempio per vedere se ho capito correttamente quello che vuoi fare. Suppongo che la funzione \(f(v)\) sia quella che hai indicato.
Partiamo dal vettore
v = [0.133802, 0.849107, 0.119021, 0.028282, 0.166712, 0.621215, 0.410744]
sqrt(v) = [0.365790, 0.921470, 0.344994, 0.168173, 0.408304, 0.788172, 0.640893]

Il massimo della radice quadrata si ha all'indice due (parto da uno siccome hai parlato di Matlab) e suddividi quindi il tuo array nei due sottoarray
vL = [0.133802]
vR = [0.119021, 0.028282, 0.166712, 0.621215, 0.410744]

E a questo punto richiameresti il tuo algoritmo su questi due sottoarray ottenendo che l'indice nel primo array è uno, mentre quello nel secondo è uguale a quattro. E così via per i sottoarray via via più piccoli. Qual'è però il risultato che viene restituito? E come vengono messi insieme i risultati ottenuti dai due sottoproblemi?

elgiovo
Si, l'esempio è corretto.

Vorrei in uscita un vettore con gli indici del vettore originario per cui f(v) ha trovato un output a seguito di uno splitting.
Uso il tuo esempio per maggiore chiarezza: il primo di questi indici che vorrei in output è 2, derivante dalla prima iterazione.
Alla seconda iterazione, gli indici che vorrei in uscita (da accodare al 2 trovato in precedenza) sono 1 e 6. Infatti, per vL l'indice 1 coincide con l'indice 1 di v, ma per vR l'indice 4, per cui f(vR) ha trovato un output, corrisponde in realtà a 6 in v. Se per qualche motivo il codice dovesse uscire dopo due iterazioni (in questo esempio in realtà continua a iterare su vR visto che ha lunghezza > 1) il vettore che vorrei in uscita sarebbe [2, 1, 6] (o una sua qualche permutazione, tanto poi lo ordino alla fine).

claudio862
Dovrebbe bastare passare l'offset corretto alla funzione, ed incrementarlo dell'indice corrente quando si esplora la sottoparte destra del vettore.

function(vector, offset) =
    i = getIndex(vector)
    left = vector[0 : i-1]
    right = vector[i+1 : end]
    leftIndeces = function(left, offset)
    rightIndeces = function(right, offset + i)
    return [i + offset, leftIndeces, rightIndeces]

L'offset iniziale è ovviamente zero.

apatriarca
Ho l'impressione che la funzione restituisca semplicemente tutti gli indici per cui la tua funzione sia in qualche modo definita. Si arriverà infatti sempre ad un livello della ricorsione in cui il vettore ha lunghezza uno e la tua funzione può restituire o meno questo indice. Se poi ordini in qualche modo gli indici, non vedo quale sia il senso di usare la ricorsione quando una semplice iterazione sarebbe sufficiente. In questo caso basterebbe probabilmente qualcosa come (1:length(v))(v > 0) per ottenere la lista di indici in un ordine diverso da quello ottenuto nella tua funzione.

elgiovo
Aspetta apatriarca, non fossilizzarti sul mio esempio, che era solo una funzioncina banale a scopo illustrativo. La mia funzione agisce su un vettore, ed è piuttosto "raro" che l'esito sia un output, quindi in uscita non avrò tutti gli indici del vettore di partenza, ma un piccolo sottoinsieme di questi. Se analizzando un sottovettore il codice non trova nulla, allora smette di splittarlo. Nel tuo esempio la funzione prima o poi analizzerà uno a uno tutti gli indici, perché i numeri che passi sono tutti positivi. Se ad esempio metti degli intervalli con numeri negativi non è più vero. Mi sembra necessario usare la ricorsione qui, non saprei farlo iterativamente.

apatriarca
Io credo invece che la complessità sarebbe molto inferiore invece. Nell'esempio del massimo ad esempio, anche fossero tutti numeri negativi, hai bisogno di iterare su tutto l'array per trovare il massimo. A meno quindi di riuscire a testare la tua condizione su tutto l'array in tempo sublineare, l'iterazione è meglio.

elgiovo
No. La mia funzione deve trovare dei "salti" bruschi e agisce su vettori abbastanza lunghi, non è di tipo element-wise. L'output della funzione è l'indice del salto più brusco che trova. Per trovare gli altri devo splittare in corrispondenza del salto e agire ricorsivamente nei sottovettori.

A me non sembra fattibile in modo iterativo, tu come lo implementeresti?

apatriarca
Dipende dal risultato che vuoi ottenere dopo l'ordinamento dei tuoi indici. Vuoi ottenere l'elenco di tutti i tuoi salti? Vuoi ottenere i salti più importanti?

Se per trovare il salto più brusco nella tua funzione hai comunque bisogno di iterare su tutto l'array, allora potrebbe essere più efficiente trovare una soluzione iterativa (ma dipende da quello che stai esattamente cercando di fare). Se avessi invece un algoritmo in grado di trovare questo pivot in tempo per esempio logaritmico, il discorso potrebbe essere diverso.

elgiovo
"claudio86":
Dovrebbe bastare passare l'offset corretto alla funzione, ed incrementarlo dell'indice corrente quando si esplora la sottoparte destra del vettore.

function(vector, offset) =
    i = getIndex(vector)
    left = vector[0 : i-1]
    right = vector[i+1 : end]
    leftIndeces = function(left, offset)
    rightIndeces = function(right, offset + i)
    return [i + offset, leftIndeces, rightIndeces]

L'offset iniziale è ovviamente zero.


Questa mi sembra una buona implementazione, grazie. Quindi se v è il vettore di partenza, la chiameresti con

function(v,0)?

Come posso implementare il caso base? Devo stoppare l'iterazione su un sottovettore (ad esempio il Sx) quando al suo interno function() non restituisce nulla e continuarla sull'altro (ad esempio il Dx).

apatriarca
Puoi per esempio far restituire a getIndex -1 (o zero se gli indici partono da 1) quando non trova alcun indice e quindi scrivere
i = getIndex(vector)
if i < 0 return []
...

claudio862
Esattamente come dice apatriarca, nel caso base restituisci una lista vuota, e fai lo stesso se chiami "function" su una lista vuota. Per esempio in Haskell (dove ho chiamato la funzione "explore" invece di "function"):

import Data.List

explore :: ([a] -> Maybe Int) -> [a] -> [Int]
explore f = helper 0
  where
  helper offset xs = case f xs of
    Nothing -> []
    Just i -> (i + offset) : leftIndeces ++ rightIndeces
      where
      left = take i xs
      right = drop (i+1) xs
      leftIndeces = helper offset left
      rightIndeces = helper (offset + i + 1) right

printIndeces :: Show a => [a] -> [Int] -> IO ()
printIndeces _ [] = return ()
printIndeces xs (i:is) = do
  putStrLn ("xs[" ++ show i ++ "] = " ++ show (xs !! i))
  printIndeces xs is

fSqrt :: (Floating a, Ord a, Show a) => [a] -> Maybe Int
fSqrt [] = Nothing
fSqrt xs =
    case maximum ys of
        Nothing -> Nothing
        Just m -> elemIndex (Just m) ys
      where

      ys = map f xs

      f n
        | n >= 0 = Just (sqrt n)
        | otherwise = Nothing

main :: IO ()
main = do
  let f = fSqrt
      xs = [-3, 3, 7, 8, -32, 2, 0] :: [Double]
      is = explore f xs

  putStrLn ("Original list: " ++ show xs)
  putStrLn "Found indeces:"
  printIndeces xs is

(La cosa difficile è stato scrivere l'equivalente di f(v) = indice(max(sqrt(v))), la funzione "fSqrt")
La funzione "explore" fa quello che vuoi tu: prende come argomenti una lista e una funzione che data una sottolista restituisce un indice o niente, e restituisce una lista di indici.

Output:

Original list: [-3.0,3.0,7.0,8.0,-32.0,2.0,0.0]
Found indeces:
xs[3] = 8.0
xs[2] = 7.0
xs[1] = 3.0
xs[5] = 2.0
xs[6] = 0.0


Comunque prima c'era un errore, dovrebbe essere:

function(vector, offset) =
    i = getIndex(vector)
    left = vector[0 : i-1]
    right = vector[i+1 : end]
    leftIndeces = function(left, offset)
    rightIndeces = function(right, offset + i + 1)
    return [i + offset, leftIndeces, rightIndeces]

elgiovo
Grazie a entrambi!

claudio86, mi spiace di averti fatto scrivere programmi (tra l'altro è la prima volta che vedo un programma in Haskell :shock: ), comunque è stato utile.

Già che sono qua, la funzione che individua i salti applica un'analisi basata su carte cumulative + bootstrapping. Se per caso ve ne intendete e avete qualche hint per me (tipo come tarare i parametri), battete un colpo! :D

claudio862
"elgiovo":
Grazie a entrambi!

claudio86, mi spiace di averti fatto scrivere programmi (tra l'altro è la prima volta che vedo un programma in Haskell :shock: ), comunque è stato utile.

Beh, erano poco più di una quarantina di linee, giusto per controllare se avevo sbagliato qualcosa (infatti mi ero perso un +1). La funzione "explore" poi è praticamente identica a quella che avevo scritto in pseudocodice prima. L'unica cosa che mi ha portato via qualche minuto è stata "fSqrt". Che in effetti potrebbe essere scritta in modo un po' più idiomatico (giusto per completezza):

fSqrt :: (Floating a, Ord a) => [a] -> Maybe Int
fSqrt [] = Nothing
fSqrt xs = do
    let ys = map f xs
    m <- maximum ys
    elemIndex (Just m) ys
  where
  f n
    | n >= 0 = Just (sqrt n)
    | otherwise = Nothing

apatriarca
Nonostante sia un grande sostenitore di haskell, credo che in questo caso non sia il miglior linguaggio in cui mostrare questo algoritmo. Usando le liste è infatti impossibile accedere in modo casuale agli elementi della lista e si è quindi costretti ad iterare più volte su di essa. Siccome guardando il tuo codice mi erano però venute alcune idee per implementarlo in modo alternativo, ho provato a implementarne una mia versione. È certamente più difficile da comprendere se non si conosce haskell, ma a mio parere più idiomatico. Ovviamente sarebbe possibile lavorare anche con gli array, ma è in generale più complicato e scomodo.
import Data.List
import Data.Function
import Control.Arrow

algo :: Ord b => (a -> b) -> (a -> Bool) -> [a] -> [Int]
algo _ _ [] = []
algo f inDom xs = algo' f $ filter (inDom . fst) $ zip xs [0..]

algo' :: Ord b => (a -> b) -> [(a, Int)] -> [Int]
algo' _ [] = []
algo' f xs = idx : concatMap (algo' f) [xL, xR]
  where idx = snd $ maximumBy (compare `on` (f . fst)) xs
        (xL, xR) = (takeWhile ((<idx) . snd) &&& dropWhile ((<=idx) . snd)) xs

main :: IO ()
main = do
    putStr "Original list: "
    print xs
    putStr "Result: "
    print $ algo sqrt (>=0.0) xs
  where xs = [-3, 3, 7, 8, -32, 2, 0] :: [Double]


@elgiovo: non conosco l'algoritmo di cui parli. Sapresti darmi qualche riferimento? Ma è un qualche tipo di algoritmo probabilistico o è deterministico?

elgiovo

@elgiovo: non conosco l'algoritmo di cui parli. Sapresti darmi qualche riferimento?


Certo.

Qui trovi una descrizione semplice ma efficace.
Qui c'è un'applicazione del metodo nel mio settore di ricerca (è un'articolo IEEE, non so se riesci ad accedere)

In parole povere: si prende una serie temporale, si misura in qualche modo (carte cumulative) la variazione al suo interno, poi si fa la stessa cosa con le versioni scrambled della stessa serie (ottenute tramite bootstrapping) e con un criterio di confronto si decide se e dove c'è stato un cambiamento.
Matlab viene incontro con la funzione bootstrp. Attualmente però sto avendo qualche problemino con il mio codice. Ad esempio, se la serie è troppo "frastagliata" ho in uscita l'indice 1 e alla successiva iterazione il codice si impalla.

apatriarca
Ma quindi calcoli tutte queste cose ad ogni livello della ricorsione? Oppure le calcoli solo all'inizio e poi usi l'algoritmo solo per cercare i punti di variazione? Da quello che ho letto non mi è in effetti chiaro quale sia il vantaggio di un approccio divide et impera contro un semplice filtraggio dei dati basato su una condizione. Eventualmente seguito da un ordinamento per ottenerli in ordine di significatività. Ma forse mi sbaglio. Ho dato solo una lettura molto veloce del primo riferimento che mi hai dato.

elgiovo
Si, lo applico ad ogni livello. Questo perché ad ogni passata l'algoritmo trova solo il punto di cambiamento più significativo. Per trovare gli altri bisogna agire sul vettore destro e quello sinistro. Per come è implementato l'algoritmo, secondo me è impossibile farlo in maniera iterativa. Tra l'altro, in entrambi i riferimenti, si dice chiaramente di usare l'approccio divide & conquer.
Ho provato tantissimi tipi di filtraggio, ma ti assicuro che non è un'operazione semplice e comunque non dà risultati soddisfacenti. Questo algoritmo, quando mi funziona, mi sembra appropriato al mio scopo.

apatriarca
Rileggendo l'articolo mi rendo in effetti conto che non viene fornito un criterio per estrarre i punti di cambiamento, piuttosto uno stimatore per il punto di cambiamento più probabile e significativo. Per poter quindi trovare gli altri punti di cambiamento è quindi in effetti necessario analizzare separatamente i dati che precedono e seguono questo punto di cambiamento. Per cui è necessario usare un metodo come quello che hai descritto.

apatriarca
Seguendo la prima pagina web che mi hai linkato ho cercato di riprodurre i loro risultati (più o meno) con un programma in C.
#include <float.h>
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>


/* Recursive change point analysis using CUSUM estimator */
unsigned change_point_analysis(double *data_in, unsigned size, unsigned offset, unsigned *indices_out, double *confidences)
{
    if (size == 0) { return 0; }

    /* Finds average */
    double average = 0.0;
    for (unsigned i = 0; i < size; ++i) {
        average += data_in[i];
    }
    average /= size;

    /* Finds minimum and maximum of CUSUM chart */
    double S = 0.0, Smin = 0.0, Smax = 0.0, Sdiff = 0.0;
    int max_index = -1;
    int min_index = -1;
    for (unsigned i = 0; i < size; ++i) {
        S += data_in[i] - average;
        if (S < Smin) {
            Smin = S;
            min_index = (int)i;
        }
        if (S > Smax) {
            Smax = S;
            max_index = (int)i;
        }
    }
    Sdiff = Smax - Smin;
    unsigned index = fabs(Smin) < fabs(Smax) ? max_index : min_index;

    /* Bootstrapping */
    srand((unsigned int)time(NULL));
    double * bootstrap_array = (double *)malloc(sizeof(double)*size);
    memcpy(bootstrap_array, data_in, sizeof(double)*size);

    double confidence = 0.0;
    for (int k = 0; k < 10000; ++k) {
        /* Random permutation */
        for (unsigned i = 0; i < size; ++i) {
            int j = rand() % (i + 1);
            double tmp = bootstrap_array[i];
            bootstrap_array[i] = bootstrap_array[j];
            bootstrap_array[j] = tmp;
        }

        /* CUSUM chart computation */
        double Sb = 0.0, Sbmin = 0.0, Sbmax = 0.0, Sbdiff = 0.0;
        for (unsigned i = 0; i < size; ++i) {
            Sb += bootstrap_array[i] - average;
            Sbmin = Sb < Sbmin ? Sb : Sbmin;
            Sbmax = Sb > Sbmax ? Sb : Sbmax;
        }
        Sbdiff = Sbmax - Sbmin;
        if (Sbdiff < Sdiff) { ++confidence; }
    }
    confidence /= 10000.0;

    free(bootstrap_array);
    if (confidence < 0.9) { /* Voglio confidenza di almeno il 90% */
        return 0;
    }

    /* Output and recursion */
    indices_out[0] = index + offset;
    confidences[0] = confidence;
    unsigned ret = 1;
    ret += change_point_analysis(data_in, index, offset, indices_out+ret, confidences+ret);
    ret += change_point_analysis(data_in+index+1, size-index-1, offset+index+1, indices_out+ret, confidences+ret);
    return ret;
}

int main(void)
{
    double data[24] = {10.7, 13.0, 11.4, 11.5, 12.5, 14.1, 14.8, 14.1, 12.6, 16.0, 11.7, 10.6,
                       10.0, 11.4,  7.9,  9.5,  8.0, 11.8, 10.5, 11.2,  9.2, 10.1, 10.4, 10.5};
    unsigned change_points[24] = {0};
    double confidences[24] = {0.0};

    unsigned n = change_point_analysis(data, 24, 1, change_points, confidences);
    for (int i = 0; i < n; ++i) {
        printf("%d. Change-point at index %2d with confidence %6.2f%%\n", i+1, change_points[i], confidences[i]*100.0); 
    }
}

Il risultato che ho ottenuto è infatti
1. Change-point at index 11 with confidence  99.98%
2. Change-point at index  5 with confidence  91.52%

Il risultato sembrerebbe essere abbastanza in linea con l'articolo. Soprattutto se si considera che lui ha usato uno stimatore diverso. L'ho scritto in C perché non avevo matlab sotto mano. Essendo in C99 ha bisogno di un compilatore che supporti tale standard (in alternativa dovrebbe essere un codice C++ valido). Manca la verifica degli errori (per esempio non verifica se l'allocazione di memoria è andata a buon fine e da per scontato che gli argomenti passati siano validi).

EDIT: Aggiunto test sulla dimensione.
EDIT2: Trovato un altro bug.

elgiovo
Uh, che gentile!
Guarda, allora ti copio anche il mio codice Matlab che implementa l'articolo:

Questa è la funzione che trova il salto più significativo (ha due sottofunzioni):

function stepind = step_detect_one(x)
    
    stepind = [];

        % Sdiff of data
        [S smaxind] = sdiff(x);

        % select the number of bootstrap samples and perform bootstrap analysis
        nboots = 1000;
        
        % uncomment to use Matlab function bootstrp
        %options = statset('UseParallel','always');
        %bootsdiff = bootstrp(nboots,@sdiff,x,'Options',options);
        
        % uncomment to use custom bootstrap
        bootsdiff = zeros(1,nboots);
        for j = 1:nboots
            bootsdiff(j) = sdiff(x(randperm(length(x))));
        end

        % number of bootstraps with S0diff < Sdiff and confidence level
        X = sum(find(bootsdiff < S));
        clevel = X/nboots;
        
        % threshold for confidence level
        th = 0.95;
        if clevel > th

            % evaluate the MSE estimator
            stepind = minMSE(x);
            
            % or use the CSUM estimator
            %stepind = smaxind;

        end
    

end


function [sdiff smaxind] = sdiff(x)

    % CUSUM of data
    csums = zeros(1,length(x));
    xmean = mean(x);

    % calculate cumulative sums
    for i = 2:length(x)

        csums(i) = csums(i - 1) + (x(i) - xmean);

    end

    % estimator for the step amplitude
    sdiff = max(csums) - min(csums);
    % estimator for the change index
    [smax smaxind] = max(abs(csums));

end

function minMSE = minMSE(x)
    
    MSE = zeros(1,length(x));
    
    for m = 1:length(x)
        MSE(m) = var(x(1:m)) + var(x(m+1:end));
    end
    
    [minvalue minMSE] = min(MSE);

end


Questa invece è la funzione che implementa il divide & conquer:

function stepind = step_detect(x,offset)

    i = step_detect_one(x);

    if ~isempty(i)

        left = x(1:i - 1);
        right = x(i + 1:end);
        
        if length(left) > 1
            leftIndices = step_detect(left,offset);
        else leftIndices = [];
        end
        
        if length(right) > 1
            rightIndices = step_detect(right,offset + i + 1);
        else rightIndices = [];
        end
        

        stepind = [i + offset, leftIndices rightIndices];
        
    else stepind = [];

    end
        
end


La magagna si trova nella prima funzione, perché dà quel problema di cui ti parlavo.
Non ho provato con i dati dell'articolo, ma sono piuttosto sicuro di averla implementata in modo giusto.
Ti linko il file di dati "problematico" che sto usando come test-case.

http://dfiles.eu/files/0eho3xhi2

Rispondi
Per rispondere a questa discussione devi prima effettuare il login.