a
    ^gD                     @   s   d dl Z d dlZd dlZd dlZd dlmZ d dlZd dlZd dlm	Z	 d dl
Z
d dlZd dlZd dlZd dlmZ G dd dZG dd dZd	ejejd d d
dddddd ddfddZdS )    N)
ThreadPool)Queue)get_invlistc                   @   sz   e Zd ZdZdddZdd Zdd	 Zd
d Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd ZdS ) BigBatchSearcherz
    Object that manages all the data related to the computation
    except the actual within-bucket matching and the organization of the
    computation (parallel or not)
    r   Fc                 C   s`   || _ g | _|| _|| _|| _t|j}tjt	|||d| _
dgd | _t  | _| _d S )N)keep_maxr      )verbosetictocxqindexuse_float16faissZis_similarity_metricmetric_typeZ
ResultHeaplenrht_accutime	t_displayt0)selfr   r
   kr   r   r    r   l/var/www/html/cobodadashboardai.evdpl.com/venv/lib/python3.9/site-packages/faiss/contrib/big_batch_search.py__init__   s    zBigBatchSearcher.__init__c                 C   s   t   | _d S N)r   	t_accu_t0r   r   r   r   start_t_accu0   s    zBigBatchSearcher.start_t_accuc                 C   s    | j |  t | j 7  < d S r   )r   r   r   )r   nr   r   r   stop_t_accu3   s    zBigBatchSearcher.stop_t_accuc                 C   s*   |t   f| _| jdkr&t|ddd d S )Nr   Tendflush)r   r	   r   print)r   namer   r   r   tic6   s    
zBigBatchSearcher.ticc                 C   s:   | j \}}t | }| jdkr6t| d|dd |S )Nr   z: .3fz s)r	   r   r   r$   )r   r%   r   dtr   r   r   toc;   s
    

zBigBatchSearcher.tocc                 C   s  | j dks.| j dkr2|dkr2t | jd k r2d S t | j }td|dd| d| jj d	| jd
 dd| jd dd| jd dd| jd dd| jd dd| jd ddtj	|| jj |d  | d dt
  | j dkrdnddd t | _d S )N      i  g      ?[z.1fz	 s] list /z times prep q r   r'   z prep b z comp z res    z	 wait in    z
 wait out    z eta )secondsz mem r    
Tr!   )r   r   r   r   r$   r   nlistr   datetime	timedeltar   Zget_mem_usage_kb)r   ltr   r   r   reportB   s>    






zBigBatchSearcher.reportc                 C   s   |  d d}t| j}tj|| jjfdd}td||D ]@}t||| }| jj	
| j|| | jj\}}||||< q:|   || _d S )Nzcoarse quantizationi   int32)dtyper   )r&   r   r
   npemptyr   nproberangemin	quantizersearchr)   q_assign)r   bsZnqrB   i0i1Zq_dis_iZ
q_assign_ir   r   r   coarse_quantizationW   s    

z$BigBatchSearcher.coarse_quantizationc                 C   sz   |  d | j}|d7 }tj| j| jjd dd| _| j | _| j	dkrZt
d| jd  | jdd  | _| `|   d S )Nzbucket sortr*      )Znbucketntr   z  number of -1s:)r&   rB   r   Zmatrix_bucket_sort_inplacer   r3   bucket_limsravel	query_idsr   r$   r)   )r   rB   r   r   r   reorder_assigne   s    

zBigBatchSearcher.reorder_assignc                 C   s   t   }| j}| j| | j|d   }}| j|| }| j| }| jrV||j| }t   }t|j	|\}	}
| j
du r|
 }
n
| 
|
}
| jr|
d}
|d}t   }| jd  || 7  < | jd  || 7  < |||	|
fS )z4 prepare the queries and database items for bucket lr*   NZfloat16r   )r   r   rI   rK   r
   by_residualr@   Zreconstructr   Zinvlistsdecode_funcrJ   r   Zastyper   )r   r6   r   r   rD   rE   q_subsetxq_lt1list_idsxb_lt2r   r   r   prepare_bucketr   s&    





zBigBatchSearcher.prepare_bucketc                 C   sX   |du rdS t   }|du r"|}n|| }| j||| | jd  t   | 7  < dS )z,add the bucket results to the heap structureNr.   )r   r   Zadd_result_subsetr   )r   rO   DrR   Ir   r   r   r   add_results_to_heap   s    z$BigBatchSearcher.add_results_to_heapc                 C   s   | j j| jj| jjfS r   )r
   shaper   r=   r3   r   r   r   r   sizes_in_checkpoint   s    z$BigBatchSearcher.sizes_in_checkpointc                 C   sh   |d }t |d6}t|  || jj| jjfd|d W d    n1 sN0    Y  t|| d S )Nz.tmpwb)sizes	completedr   )	openpickledumprZ   r   rV   rW   osreplace)r   fnamer]   Ztmpnamefr   r   r   write_checkpoint   s    "z!BigBatchSearcher.write_checkpointc                 C   s   t |d}t|}W d    n1 s*0    Y  |d |  ksHJ |d d | jjd d < |d d | jjd d < |d S )Nrbr\   r   r   r*   r]   )r_   r`   loadrZ   r   rV   rW   )r   rd   re   Zckpr   r   r   read_checkpoint   s    (z BigBatchSearcher.read_checkpointN)r   F)__name__
__module____qualname____doc__r   r   r   r&   r)   r8   rF   rL   rU   rX   rZ   rf   ri   r   r   r   r   r      s    	  
r   c                   @   s,   e Zd ZdZdejejfddZdd ZdS )BlockComputerz computation within one bucket knn_functionc                 C   s   || _ |jtjkr0t|j|j}dd }d}n|jtjkrvt|j|j	j
|j	j|j}|j	|_	|j	j}d|_|j}nR|jtjkrt|j|jj|j}|j|_|jj}d|_|j}ntd|j d|| _|dkrd n|| _|| _|| _|| _|| _d S )Nc                 S   s
   |  dS )Nfloat32)view)xr   r   r   <lambda>       z(BlockComputer.__init__.<locals>.<lambda>FTzindex type z not supportedr   )r   	__class__r   ZIndexIVFFlatZ	IndexFlatdr   Z
IndexIVFPQZIndexPQZpqMnbitsdecodeZ
is_trainedrM   ZIndexIVFScalarQuantizerZIndexScalarQuantizersqZqtypeRuntimeError
index_helprN   methodpairwise_distancesknn)r   r   r}   r~   r   r|   rN   rM   r   r   r   r      s8    zBlockComputer.__init__c           	      K   s   | j j}|jdks|jdkr&d  }}n| jdkr`t|| jj t|| j_	| j
||\}}nH| jdkr| j|||d}d }n(| jdkr| j|||fd|i|\}}||fS )Nr   r   r~   )metricro   r   )r   r   sizer}   r   Zcopy_array_to_vectorr|   codesr   ZntotalrA   r~   r   )	r   rP   rS   rR   r   
extra_argsr   rV   rW   r   r   r   block_search   s    



zBlockComputer.block_searchN)	rj   rk   rl   rm   r   r~   r   r   r   r   r   r   r   rn      s   
#rn   ro   Fr*   i   r^   c           1         s`  j }|dv sJ |j}t|| tdj }t| tdjtdj  }|| | }|dkrtd| d| d| d	| d
|d dd t|||d t	|||dj
 _
j _|du r܈   n| _   |du rj}t }|durh||fdjfks J tj|r^td|   |}tdt|  n
td |dkrt||D ]h} |  |\}}}}t }|||\}} jd  t | 7  <  |||| q|nJ|dkr fdd} |} d}!td}"t||D ]|} | |"||!|d f}#| \}}}}   |||\}} d ||||f}!   |#  }  d q" j|!  |"!  n~dd fdd}$d' fdd	}%fdd }&t"d}'t"d}(|$|&dt |(|'})|$|%|	||||'d}*t }+td! |(  },|,sFq$|,\}-}.}/}0}}}}|-|krldd   jd  |07  <  jd  |.7  <  jd"  |/7  < td#|-   |||| td$|-  |#|-  |- |dur*t |+ |kr*td%  $|| t }+q*|*%  |)%   &d&  j'(   )   j'j* j'j+fS )(a  
    Search queries xq in the IVF index, with a search function that collects
    batches of query vectors per inverted list. This can be faster than the
    regular search indexes.
    Supports IVFFlat, IVFPQ and IVFScalarQuantizer.

    Supports three computation methods:
    method = "index":
        build a flat index and populate it separately for each index
    method = "pairwise_distances":
        decompress codes and compute all pairwise distances for the queries
        and index and add result to heap
    method = "knn_function":
        decompress codes and compute knn results for the queries

    threaded=0: sequential execution
    threaded=1: prefetch next bucket while computing the current one
    threaded=2: prefetch prefetch_threads buckets at a time.

    compute_threads>1: the knn function will get an additional thread_no that
        tells which worker should handle this.

    In threaded mode, the computation is tiled with the bucket perparation and
    the writeback of results (useful to maximize GPU utilization).

    use_float16: convert all matrices to float16 (faster for GPU gemm)

    q_assign: override coarse assignment, should be a matrix of size nq * nprobe

    checkpointing (only for threaded > 1):
    checkpoint: file where the checkpoints are stored
    checkpoint_freq: when to perform checkpoinging. Should be a multiple of threaded

    start_list, end_list: process only a subset of invlists
    )r   r~   ro   r9   Zint64rp   r   zmemory: queries z assign z result z total z = i   @r'   z GiB)r   r   )r}   r~   r   Nzrecovering checkpoint: z   already completed: z$no checkpoint: starting from scratchr+   r*   c                    s*   | dur j |   |jk r& |S dS )z` perform the addition for the previous bucket and
            prefetch the next (if applicable) N)rX   r3   rU   )to_addr6   )bbsr   r   r   add_results_and_prefetcha  s    

z2big_batch_search.<locals>.add_results_and_prefetchr/   c           	         s   zxt |R fddt||D }|D ]}|  q0    W d    n1 sb0    Y  d  W n   t  t	   Y n0 d S )Nc                    s(   g | ] }| vrj |fd qS ))args)apply_async).0ir]   input_queueoutput_queuepooltaskr   r   
<listcomp>  s   zAbig_batch_search.<locals>.task_manager_thread.<locals>.<listcomp>)
r   r>   getclosejoinput	traceback	print_exc_threadinterrupt_main)	r   Z	pool_size
start_taskZend_taskr]   r   r   resrr   r   r   task_manager_thread~  s    	

&z-big_batch_search.<locals>.task_manager_threadc                     s    t j | d}d|_|  |S )N)targetr   T)	threadingThreaddaemonstart)r   task_manager)r   r   r   r     s    z&big_batch_search.<locals>.task_managerc                    sn   zJt d|    | \}}}}|| ||||f t d|   W n   t  t   Y n0 d S )NzPrepare start: zPrepare end: )logginginforU   r   r   r   r   r   )task_idr   r   rO   rP   rR   rS   )r   r   r   prepare_task  s    z&big_batch_search.<locals>.prepare_taskc              
      sD  zt d|   d}t }t d|   | }t | }|d u rZ|d  q|\}}}	}
}t d|  d|  t }dkr j|	||
| d\}}n |	||
\}}t | }t d|  d|  t }||||||||
|f t | }qt d	|   W n   t  t	   Y n0 d S )
NzCompute start: r   zCompute input: task zCompute work: task z, centroid r*   )	thread_idzCompute output: task zCompute end: )
r   r   r   r   r   r   r   r   r   r   )r   r   r   
t_wait_outr   Zinput_value	t_wait_incentroidrO   rP   rR   rS   rV   rW   	t_compute)compcomputation_threadsr   r   r   compute_task  s<    

z&big_batch_search.<locals>.compute_taskzWaiting for resultr0   zAdding to heap start: centroid zAdding to heap end: centroid zwriting checkpointzfinalize heap)N),r=   nbytesr   r;   r:   itemsizer   r   r   rn   rN   rM   rF   rB   rL   r3   setrb   pathexistsri   r>   r8   rU   r   r   r   rX   r   r   r   r   r   r   r   addrf   r   r&   r   finalizer)   rV   rW   )1r   r
   r   r}   r~   r   r   Zthreadedr   Zprefetch_threadsr   rB   
checkpointZcheckpoint_freq
start_listZend_listZcrash_atr=   Zmem_queriesZ
mem_assignZmem_resZmem_totr]   r6   rO   rP   rR   rS   Zt0irV   rW   r   Zprefetched_bucketr   r   Zprefetched_bucket_ar   r   r   Zprepare_to_compute_queueZcompute_to_main_queueZcompute_task_managerZprepare_task_managerZt_checkpointvaluer   r   r   r   r   )r   r   r   r   r   r   r   big_batch_search   s    4













	#	








r   )r   r`   rb   r   Zmultiprocessing.poolr   r   r   queuer   r   r4   numpyr;   r   Zfaiss.contrib.inspect_toolsr   r   rn   r~   r   r   r   r   r   r   <module>   s<    :